diff --git a/CMakeLists.txt b/CMakeLists.txt index b0a3156..33f03cc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -72,6 +72,13 @@ if(THRIFT_FOUND) list(APPEND HUSKY_EXTERNAL_DEFINITION ${THRIFT_DEFINITION}) endif(THRIFT_FOUND) +# ORC +if(ORC_FOUND) + list(APPEND HUSKY_EXTERNAL_INCLUDE ${ORC_INCLUDE_DIR}) + list(APPEND HUSKY_EXTERNAL_LIB ${ORC_LIBRARY}) + list(APPEND HUSKY_EXTERNAL_DEFINITION ${ORC_DEFINITION}) +endif(ORC_FOUND) + if(WIN32) list(APPEND HUSKY_EXTERNAL_LIB wsock32 ws2_32) endif() diff --git a/cmake/dep.cmake b/cmake/dep.cmake index a419220..810f915 100644 --- a/cmake/dep.cmake +++ b/cmake/dep.cmake @@ -135,3 +135,36 @@ if(WITHOUT_THRIFT) unset(THRIFT_FOUND) message(STATUS "Not using Thrift due to WITHOUT_THRIFT option") endif(WITHOUT_THRIFT) + +### ORC ### + +#NAMES liblz4.a liborc.a libprotobuf.a libsnappy.a libz.a +#NAMES ColumnPrinter.hh Int128.hh MemoryPool.hh orc-config.hh OrcFile.hh Reader.hh Type.hh Vector.hh +find_path(ORC_INCLUDE_DIR NAMES orc/OrcFile.hh) +find_library(ORC_L0 NAMES protobuf NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L1 NAMES z NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L2 NAMES lz4 NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L3 NAMES snappy NO_CMAKE_ENVIRONMENT_PATH NO_CMAKE_SYSTEM_PATH NO_SYSTEM_ENVIRONMENT_PATH) +find_library(ORC_L4 NAMES orc) + +if (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4) + set(ORC_FOUND true) +endif (ORC_INCLUDE_DIR AND ORC_L1 AND ORC_L0 AND ORC_L2 AND ORC_L3 AND ORC_L4) +if (ORC_FOUND) + set(ORC_DEFINITION "-DWITH_ORC") + # The order is important for dependencies. + set(ORC_LIBRARY ${ORC_L4} ${ORC_L3} ${ORC_L2} ${ORC_L1} ${ORC_L0}) + message (STATUS "Found ORC:") + message (STATUS " (Headers) ${ORC_INCLUDE_DIR}") + message (STATUS " (Library) ${ORC_L0}") + message (STATUS " (Library) ${ORC_L1}") + message (STATUS " (Library) ${ORC_L2}") + message (STATUS " (Library) ${ORC_L3}") + message (STATUS " (Library) ${ORC_L4}") +else(ORC_FOUND) + message (STATUS "Could NOT find ORC") +endif(ORC_FOUND) +if(WITHOUT_ORC) + unset(ORC_FOUND) + message(STATUS "Not using ORC due to WITHOUT_ORC option") +endif(WITHOUT_ORC) diff --git a/core/constants.hpp b/core/constants.hpp index 78714d8..57d6d65 100644 --- a/core/constants.hpp +++ b/core/constants.hpp @@ -73,6 +73,7 @@ const uint32_t TYPE_KAFKA_END_REQ = 0xfa091344; const uint32_t TYPE_MONGODB_REQ = 0xfa091388; const uint32_t TYPE_MONGODB_END_REQ = 0xfa091389; const uint32_t TYPE_LOCAL_BLK_REQ = 0xfa0e12a2; +const uint32_t TYPE_ORC_BLK_REQ = 0xfa2e32a1; const uint32_t TYPE_STOP_ASYNC_REQ = 0xf89d74b4; const uint32_t TYPE_STOP_ASYNC_YES = 0x09b8ab2b; const uint32_t TYPE_STOP_ASYNC_NO = 0x192a241a; diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt index 99c2138..fb8fc43 100644 --- a/examples/CMakeLists.txt +++ b/examples/CMakeLists.txt @@ -61,6 +61,12 @@ target_link_libraries(WordCountFlume ${husky}) target_link_libraries(WordCountFlume ${HUSKY_EXTERNAL_LIB}) husky_default_properties(WordCountFlume) +# WordCountORC +add_executable(WordCountORC wc_mr_orc.cpp) +target_link_libraries(WordCountORC ${husky}) +target_link_libraries(WordCountORC ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(WordCountORC) + # PI add_executable(PI pi.cpp) target_link_libraries(PI ${husky}) @@ -120,3 +126,9 @@ add_executable(AMAT affinity.cpp) target_link_libraries(AMAT ${husky}) target_link_libraries(AMAT ${HUSKY_EXTERNAL_LIB}) husky_default_properties(AMAT) + +# Cube +add_executable(Cube cube.cpp) +target_link_libraries(Cube ${husky}) +target_link_libraries(Cube ${HUSKY_EXTERNAL_LIB}) +husky_default_properties(Cube) diff --git a/examples/cube.cpp b/examples/cube.cpp new file mode 100644 index 0000000..d02e59f --- /dev/null +++ b/examples/cube.cpp @@ -0,0 +1,627 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "boost/algorithm/string.hpp" +#include "boost/tokenizer.hpp" + +#include "core/engine.hpp" +#include "io/hdfs_manager.hpp" +#include "io/input/inputformat_store.hpp" +#include "lib/aggregator_factory.hpp" + +typedef boost::tokenizer> Tokenizer; +typedef std::map DimMap; +typedef std::pair Pair; +typedef std::pair Filter; +typedef std::map FilterMap; +typedef std::vector AttrIdx; +typedef std::vector Tuple; +typedef std::vector TupleVector; +typedef TupleVector::iterator TVIterator; + +thread_local std::string ghost; +thread_local std::string gport; +thread_local std::string ghdfs_dest; +thread_local int gpart_factor; + +using husky::PushCombinedChannel; +using husky::lib::Aggregator; + +/** + Basic object type for main list_execute, + which is an instance of a node in the cube +*/ +class Group { + public: + using KeyT = std::string; + + Group() = default; + explicit Group(const KeyT& t) : key(t) {} + + const KeyT& id() { return key; } + KeyT key; +}; + +/** + A node in the cube lattice, + or BUC processing tree +*/ +class TreeNode { + public: + TreeNode() = default; + explicit TreeNode(AttrIdx&& key) : key_(std::move(key)) { visit = false; } + + explicit TreeNode(const AttrIdx& key) : key_(key) { visit = false; } + + ~TreeNode() = default; + + bool visit; + + const AttrIdx& Key() { return key_; } + + std::vector>& Children() { return children_; } + + void add_child(std::shared_ptr child) { children_.push_back(child); } + + private: + AttrIdx key_; + std::vector> children_; +}; + +class AttrSet { + public: + AttrSet() = default; + + AttrSet(AttrIdx&& key, DimMap&& mapping) : key_(std::move(key)), map_(std::move(mapping)) {} + + bool has(int attr) const { return (std::find(key_.begin(), key_.end(), attr) != key_.end()); } + + size_t size() const { return key_.size(); } + + const int operator[](int attr) const { return map_.at(attr); } + + const AttrIdx& get_attridx() const { return key_; } + + const DimMap& get_map() const { return map_; } + + private: + AttrIdx key_; + DimMap map_; +}; + +struct PairSumCombiner { + static void combine(Pair& val, Pair const& inc) { + val.first += inc.first; + val.second += inc.second; + } +}; + +bool is_parent(std::shared_ptr parent, std::shared_ptr child) { + auto child_key = child->Key(); + for (auto& col : parent->Key()) { + if (std::find(child_key.begin(), child_key.end(), col) == child_key.end()) { + return false; + } + } + return true; +} + +std::string print_key(const AttrIdx& key) { + std::string out; + for (auto& i : key) { + out = out + std::to_string(i) + " "; + } + return out; +} + +void measure(const Tuple& key_value, const AttrIdx& group_set, const AttrIdx& select, const AttrSet& key_attributes, + const AttrSet& msg_attributes, const int uid_dim, TVIterator begin, TVIterator end, + PushCombinedChannel& post_ch, Aggregator& num_write) { + int count = end - begin; + std::sort(begin, end, [uid_dim](const Tuple& a, const Tuple& b) { return a[uid_dim] < b[uid_dim]; }); + int unique = 1; + for (TVIterator it = begin; it != end; ++it) { + TVIterator next_it = it + 1; + if (next_it != end && (*it)[uid_dim] != (*next_it)[uid_dim]) { + ++unique; + } + } + + // Output + std::string out; + for (auto& attr : select) { + // If attribute is in key, + // output key value. + // Else, + // If attribute is in group, + // output attribute in the tuple + // Else, + // output * + if (key_attributes.has(attr)) { + out = out + key_value[key_attributes[attr]] + "\t"; + } else { + if (std::find(group_set.begin(), group_set.end(), attr) != group_set.end()) { + out = out + (*begin)[msg_attributes[attr]] + "\t"; + } else { + out += "*\t"; + } + } + } + + if (gpart_factor == 1) { + out = out + std::to_string(count) + "\t" + std::to_string(unique) + "\n"; + num_write.update(1); + std::string hdfs_dest = ghdfs_dest + "/" + key_value.back(); + husky::io::HDFS::Write(ghost, gport, out, hdfs_dest, husky::Context::get_global_tid()); + } else { + out += key_value.back(); + post_ch.push(Pair(count, unique), out); + } +} + +int next_partition_dim(const AttrIdx& parent_key, const AttrIdx& child_key, const DimMap& dim_map) { + for (auto& attr : child_key) { + if (std::find(parent_key.begin(), parent_key.end(), attr) == parent_key.end()) { + return dim_map.at(attr); + } + } + // error + return -1; +} + +// Parition the table according to value at the 'dim'-th column +void partition(TVIterator begin, TVIterator end, const int dim, std::vector& out_partition_result) { + std::sort(begin, end, [dim](const Tuple& a, const Tuple& b) { return a[dim] < b[dim]; }); + int i = 0; + // Store the size of each partition + out_partition_result.resize(1); + TVIterator next_tuple; + for (TVIterator it = begin; it != end; ++it) { + out_partition_result[i]++; + next_tuple = it + 1; + // If value of next row differs at the dim-th column, + // partition the table + if (next_tuple != end && (*it)[dim] != (*next_tuple)[dim]) { + ++i; + out_partition_result.resize(i + 1); + } + } +} + +void BUC(std::shared_ptr cur_node, TupleVector& table, const Tuple& key_value, const AttrIdx& select, + const AttrSet& key_attributes, const AttrSet& msg_attributes, const int uid_dim, const int dim, + TVIterator begin, TVIterator end, PushCombinedChannel& post_ch, + Aggregator& num_write) { + // Measure current group + measure(key_value, cur_node->Key(), select, key_attributes, msg_attributes, uid_dim, begin, end, post_ch, + num_write); + + // Process children if it is not visited + for (auto& child : cur_node->Children()) { + // Partition table by next column + int next_dim = next_partition_dim(cur_node->Key(), child->Key(), msg_attributes.get_map()); + if (next_dim == -1) { + throw husky::base::HuskyException("Cannot find next partition dimension from " + + print_key(cur_node->Key()) + " to " + print_key(child->Key())); + } + std::vector next_partition_result = {}; + partition(begin, end, next_dim, next_partition_result); + // Perform BUC on each partition + TVIterator k = begin; + for (int i = 0; i < next_partition_result.size(); ++i) { + int count = next_partition_result[i]; + BUC(child, table, key_value, select, key_attributes, msg_attributes, uid_dim, next_dim, k, k + count, + post_ch, num_write); + k += count; + } + } +} + +bool is_operator(const char& c) { return (c == '<' || c == '>' || c == '='); } + +void parse_group_set(const std::string& group_filter, const Tokenizer& schema_tok, + std::vector>& out_roots, std::vector& out_filters) { + boost::char_separator vbar_sep("|"); + boost::char_separator comma_sep(","); + boost::char_separator colon_sep(":"); + + Tokenizer group_filter_tok(group_filter, vbar_sep); + Tokenizer::iterator gf_it = group_filter_tok.begin(); + + /** + * Process group sets + */ + Tokenizer group_set_tok(*gf_it, colon_sep); + std::shared_ptr root; + int min_lv = INT_MAX; + int max_lv = INT_MIN; + + std::unordered_map>> tree_map; + size_t group_set_size = std::distance(group_set_tok.begin(), group_set_tok.end()); + for (auto& group : group_set_tok) { + // Encode and construct key of the node + Tokenizer column_tok(group, comma_sep); + AttrIdx tree_key = {}; + for (auto column : column_tok) { + auto it = std::find(schema_tok.begin(), schema_tok.end(), column); + if (it != schema_tok.end()) { + tree_key.push_back(std::distance(schema_tok.begin(), it)); + } else { + throw husky::base::HuskyException("Invalid schema or group sets"); + } + } + int level = tree_key.size(); + std::shared_ptr node(new TreeNode(std::move(tree_key))); + tree_map[level].push_back(node); + if (level < min_lv) { + min_lv = level; + root = node; + } + if (level > max_lv) { + max_lv = level; + } + } + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Min level: " << min_lv << "\tMax level: " << max_lv; + } + + // Build group set lattice + bool has_parent = false; + for (int i = min_lv; i < max_lv; ++i) { + if (tree_map[i].empty()) { + throw husky::base::HuskyException("Level " + std::to_string(i) + " is empty"); + } + for (auto& next_tn : tree_map[i + 1]) { + for (auto& tn : tree_map[i]) { + if (is_parent(tn, next_tn)) { + tn->add_child(next_tn); + has_parent = true; + } + } + if (!has_parent) { + throw husky::base::HuskyException("Cannot find the parent of " + print_key(next_tn->Key())); + } + has_parent = false; + } + } + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished constructing lattice."; + } + + // Construct BUC processing tree + std::shared_ptr buc_root(new TreeNode(root->Key())); + std::stack> tmp_stack; + std::stack> buc_stack; + tmp_stack.push(root); + buc_stack.push(buc_root); + while (!tmp_stack.empty()) { + std::shared_ptr cur_node = tmp_stack.top(); + tmp_stack.pop(); + std::shared_ptr cur_buc_node = buc_stack.top(); + buc_stack.pop(); + cur_node->visit = true; + for (auto& child : cur_node->Children()) { + if (!child->visit) { + tmp_stack.push(child); + std::shared_ptr new_buc_node(new TreeNode(child->Key())); + cur_buc_node->add_child(new_buc_node); + buc_stack.push(new_buc_node); + } + } + } + out_roots.push_back(buc_root); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished constructing buc processing tree."; + } + + /** + * Process WHERE + * Format AttrOperatorValue, e.g., fuid<>1:fcard_type='123' + */ + FilterMap filter; + if (std::distance(group_filter_tok.begin(), group_filter_tok.end()) == 2) { + gf_it++; + Tokenizer where_tok(*gf_it, colon_sep); + for (auto& where : where_tok) { + int pos[2] = {}; + std::string where_str = where; + for (int i = 0; i < where_str.length(); ++i) { + if (pos[0] == 0) { + if (is_operator(where_str[i])) + pos[0] = i; + } else { + if (!is_operator(where_str[i])) { + pos[1] = i; + break; + } + } + } + if (pos[0] == 0 || pos[1] == 0) { + throw husky::base::HuskyException("Invalid syntax in WHERE"); + } + std::string attr = where_str.substr(0, pos[0]); + std::string op = where_str.substr(pos[0], pos[1] - pos[0]); + std::string value = where_str.substr(pos[1], where_str.length() - pos[1]); + + auto it = std::find(schema_tok.begin(), schema_tok.end(), attr); + if (it == schema_tok.end()) { + throw husky::base::HuskyException("Invalid attribute in WHERE"); + } + int attr_idx = std::distance(schema_tok.begin(), it); + filter[attr_idx] = Filter(op, value); + } + } + out_filters.push_back(filter); +} + +bool pass_filter(const std::string& value, const Filter& filter) { + if (boost::iequals(filter.second, std::string("null"))) + return true; // Always return true if compare against null. + // Consistent to SQL + + if (filter.first == "<>") + return value != filter.second; + if (filter.first == ">") + return value > filter.second; + if (filter.first == "<") + return value < filter.second; + if (filter.first == ">=") + return value >= filter.second; + if (filter.first == "<=") + return value <= filter.second; + if (filter.first == "=") + return value == filter.second; +} + +void print_buc_tree(const std::shared_ptr& root) { + husky::LOG_I << print_key(root->Key()); + for (auto& child : root->Children()) { + print_buc_tree(child); + } +} + +void print_filter_map(const FilterMap& fmap) { + for (auto& kv : fmap) { + husky::LOG_I << kv.first << " " << kv.second.first << " " << kv.second.second; + } +} + +void cube_buc() { + gpart_factor = std::stoi(husky::Context::get_param("partition_factor")); + ghost = husky::Context::get_param("hdfs_namenode"); + gport = husky::Context::get_param("hdfs_namenode_port"); + ghdfs_dest = husky::Context::get_param("output"); + + /** + * Format of 'schema' and 'select': + * attr1,attr2,attr3,... + */ + std::string schema_conf = husky::Context::get_param("schema"); + std::string select_conf = husky::Context::get_param("select"); + + /** + * Format of 'group_sets': + * {GROUP_SETS_1|WHERE_1}{GROUP_SET_2|WHERE_2}{...}{...} + * Format of GROUP_SET: + * arrt1,attr2,attr3:attr2,attr3,attr4:...:... + * Format of WHERE + * arrt1<>value:attr2=value:...:... + */ + std::string group_conf = husky::Context::get_param("group_sets"); + + boost::char_separator comma_sep(","); + boost::char_separator colon_sep(":"); + boost::char_separator brace_sep("{}"); + + Tokenizer schema_tok(schema_conf, comma_sep); + Tokenizer select_tok(select_conf, comma_sep); + Tokenizer group_filter(group_conf, brace_sep); + + AttrIdx select; + for (auto& s : select_tok) { + auto it = std::find(schema_tok.begin(), schema_tok.end(), s); + if (it != schema_tok.end()) { + select.push_back(std::distance(schema_tok.begin(), it)); + } else { + throw husky::base::HuskyException("Attribute is not in the schema"); + } + } + + std::vector> root_vec; + std::vector filter_vec; + for (auto& item : group_filter) { + std::string item_str = item; + parse_group_set(item_str, schema_tok, root_vec, filter_vec); + } + + int uid_index = -1; + // TODO(Ruihao): AttrIdx to count is hard-coded as "fuid" + auto uid_it = std::find(schema_tok.begin(), schema_tok.end(), "fuid"); + if (uid_it != schema_tok.end()) { + uid_index = std::distance(schema_tok.begin(), uid_it); + } else { + throw husky::base::HuskyException("Cannot find fuid"); + } + + std::vector key_attr_vec; + std::vector msg_attr_vec; + + for (int i = 0; i < root_vec.size(); ++i) { + // {key} union {msg} = {select} + // {key} intersect {msg} = empty + AttrIdx key_attributes = root_vec[i]->Key(); + AttrIdx msg_attributes; + for (auto& s : select) { + if (std::find(key_attributes.begin(), key_attributes.end(), s) == key_attributes.end()) { + msg_attributes.push_back(s); + } + } + + // Mapping of attributes in the message table + // schema_idx -> msg_table_idx + DimMap msg_dim_map; + for (int i = 0; i < msg_attributes.size(); ++i) { + msg_dim_map[msg_attributes[i]] = i; + } + + // Mapping of attributes in key + DimMap key_dim_map; + for (int i = 0; i < key_attributes.size(); ++i) { + key_dim_map[key_attributes[i]] = i; + } + + key_attr_vec.push_back(AttrSet(std::move(key_attributes), std::move(key_dim_map))); + msg_attr_vec.push_back(AttrSet(std::move(msg_attributes), std::move(msg_dim_map))); + } + + // Load input and emit key\tpid\ti -> uid + auto& infmt = husky::io::InputFormatStore::create_orc_inputformat(); + infmt.set_input(husky::Context::get_param("input")); + + auto& buc_list = husky::ObjListStore::create_objlist(); + auto& buc_ch = husky::ChannelStore::create_push_channel(infmt, buc_list); + auto& post_list = husky::ObjListStore::create_objlist(); + auto& post_ch = husky::ChannelStore::create_push_combined_channel(buc_list, post_list); + + Aggregator num_write; // track number of records written to hdfs + Aggregator num_tuple; // track number of tuples read from db + + auto& agg_ch = husky::lib::AggregatorFactory::get_channel(); + + auto parser = [&](boost::string_ref& chunk) { + std::vector to_send(root_vec.size(), true); + num_tuple.update(1); + if (chunk.size() == 0) + return; + boost::char_separator sep("\t"); + Tokenizer tok(chunk, sep); + for (int i = 0; i < root_vec.size(); ++i) { + auto& filter_map = filter_vec[i]; + auto& key_attributes = key_attr_vec[i]; + auto& msg_attributes = msg_attr_vec[i]; + // auto& msg_dim_map = msg_dim_map_vec[i]; + std::string key = ""; + Tuple msg(msg_attributes.size()); + std::string fuid; + int j = 0; + for (auto& col : tok) { + if (filter_map.find(j) != filter_map.end() && !pass_filter(col, filter_map[j])) { + to_send[i] = false; + break; + } + + if (key_attributes.has(j)) { + key = key + col + "\t"; + } else if (msg_attributes.has(j)) { + msg[msg_attributes[j]] = col; + } else if (j == uid_index) { + fuid = col; + } + ++j; + } + if (to_send[i]) { + msg.push_back(fuid); + if (gpart_factor > 1) { + int bucket = std::stoi(fuid) % gpart_factor; + key = key + "p" + std::to_string(bucket); + } + key = key + "\t" + std::to_string(i); + buc_ch.push(msg, key); + } + } + }; + + husky::load(infmt, parser); + husky::lib::AggregatorFactory::sync(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total num of tuple: " << num_tuple.get_value(); + } + + // Receive + husky::list_execute(buc_list, {&buc_ch}, {&post_ch, &agg_ch}, [&](Group& g) { + auto& msgs = buc_ch.get(g); + TupleVector table(std::move(const_cast(msgs))); + boost::char_separator sep("\t"); + boost::tokenizer> tok(g.id(), sep); + std::vector key_value(tok.begin(), tok.end()); + + int filter_idx = std::stoi(key_value.back()); + key_value.pop_back(); + if (gpart_factor > 1) { + // Remove the hash value + key_value.pop_back(); + } + key_value.push_back("w" + std::to_string(filter_idx)); + + auto& buc_root = root_vec[filter_idx]; + auto& key_attributes = key_attr_vec[filter_idx]; + auto& msg_attributes = msg_attr_vec[filter_idx]; + int uid_dim = msg_attributes.size(); + + BUC(buc_root, table, key_value, select, key_attributes, msg_attributes, uid_dim, 0, table.begin(), table.end(), + post_ch, num_write); + }); + + if (gpart_factor > 1) { + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Finished BUC stage.\nStart post process..."; + } + + husky::ObjListStore::drop_objlist(buc_list.get_id()); + + husky::list_execute(post_list, {&post_ch}, {&agg_ch}, [&post_ch, &num_write](Group& g) { + auto& msg = post_ch.get(g); + size_t pos = g.id().rfind("\t"); + std::string key = g.id().substr(0, pos); + std::string w_idx = g.id().substr(pos + 1, g.id().length() - pos - 1); + std::string hdfs_dest = ghdfs_dest + "/" + w_idx; + std::string out = key + "\t" + std::to_string(msg.first) + "\t" + std::to_string(msg.second) + "\n"; + num_write.update(1); + husky::io::HDFS::Write(ghost, gport, out, hdfs_dest, husky::Context::get_global_tid()); + }); + } + + int total_num_write = num_write.get_value(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total number of rows written to HDFS: " << total_num_write; + } +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("hdfs_namenode"); + args.push_back("hdfs_namenode_port"); + args.push_back("input"); + args.push_back("output"); + args.push_back("schema"); + args.push_back("select"); + args.push_back("group_sets"); + args.push_back("partition_factor"); + + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(cube_buc); + return 0; + } + return 1; +} diff --git a/examples/wc_mr_orc.cpp b/examples/wc_mr_orc.cpp new file mode 100644 index 0000000..1e79148 --- /dev/null +++ b/examples/wc_mr_orc.cpp @@ -0,0 +1,64 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "boost/tokenizer.hpp" + +#include "core/engine.hpp" +#include "lib/aggregator_factory.hpp" +#include "io/input/inputformat_store.hpp" + +class Word { + public: + using KeyT = std::string; + + Word() = default; + explicit Word(const KeyT& w) : word(w) {} + const KeyT& id() const { return word; } + + KeyT word; + int count = 0; +}; + +void wc() { + auto& infmt = husky::io::InputFormatStore::create_orc_inputformat(); + infmt.set_input(husky::Context::get_param("input")); + husky::lib::Aggregator num_tuple(0, [](int& a, const int& b){ a += b; }); + + auto parse_wc = [&](boost::string_ref& chunk) { + if (chunk.size() == 0) + return; + num_tuple.update(1); + }; + husky::load(infmt, parse_wc); + husky::lib::AggregatorFactory::sync(); + if (husky::Context::get_global_tid() == 0) { + husky::LOG_I << "Total number of tuples: " << num_tuple.get_value(); + } +} + +int main(int argc, char** argv) { + std::vector args; + args.push_back("hdfs_namenode"); + args.push_back("hdfs_namenode_port"); + args.push_back("input"); + if (husky::init_with_args(argc, argv, args)) { + husky::run_job(wc); + return 0; + } + return 1; +} diff --git a/io/input/CMakeLists.txt b/io/input/CMakeLists.txt index 6676f0a..f7cdd47 100644 --- a/io/input/CMakeLists.txt +++ b/io/input/CMakeLists.txt @@ -47,6 +47,11 @@ if(THRIFT_FOUND) list(APPEND io-input-src-files ${io-input-flume-src-files}) endif(THRIFT_FOUND) +if(ORC_FOUND) + file(GLOB io-input-orc-src-files orc_file_splitter.cpp orc_inputformat.cpp) + list(APPEND io-input-src-files ${io-input-orc-src-files}) +endif(ORC_FOUND) + husky_cache_variable(io-input-src-files ${io-input-src-files}) add_library(input-objs OBJECT ${io-input-src-files}) diff --git a/io/input/inputformat_store.cpp b/io/input/inputformat_store.cpp index bfa5400..5418805 100644 --- a/io/input/inputformat_store.cpp +++ b/io/input/inputformat_store.cpp @@ -78,6 +78,17 @@ BinaryInputFormat& InputFormatStore::create_binary_inputformat(const std::string return *binary_input_format; } +#ifdef WITH_ORC +ORCInputFormat& InputFormatStore::create_orc_inputformat() { + InputFormatMap& inputformat_map = get_inputformat_map(); + int id = g_gen_inputformat_id++; + ASSERT_MSG(inputformat_map.find(id) == inputformat_map.end(), "Should not be reached"); + auto* orc_input_format = new ORCInputFormat(); + inputformat_map.insert({id, orc_input_format}); + return *orc_input_format; +} +#endif + #ifdef WITH_THRIFT FlumeInputFormat& InputFormatStore::create_flume_inputformat(std::string rcv_host, int rcv_port) { InputFormatMap& inputformat_map = get_inputformat_map(); diff --git a/io/input/inputformat_store.hpp b/io/input/inputformat_store.hpp index a227f2d..9a370a1 100644 --- a/io/input/inputformat_store.hpp +++ b/io/input/inputformat_store.hpp @@ -28,6 +28,9 @@ #endif #include "io/input/separator_inputformat.hpp" #include "io/input/xml_inputformat.hpp" +#ifdef WITH_ORC +#include "io/input/orc_inputformat.hpp" +#endif namespace husky { namespace io { @@ -41,6 +44,9 @@ class InputFormatStore { static SeparatorInputFormat& create_separator_inputformat(const std::string& pattern); static XMLInputFormat& create_xml_inputformat(const std::string& start_pattern, const std::string& end_pattern); static BinaryInputFormat& create_binary_inputformat(const std::string& url, const std::string& filter = ""); +#ifdef WITH_ORC + static ORCInputFormat& create_orc_inputformat(); +#endif #ifdef WITH_THRIFT static FlumeInputFormat& create_flume_inputformat(std::string rcv_host, int rcv_port); #endif diff --git a/io/input/orc_file_splitter.cpp b/io/input/orc_file_splitter.cpp new file mode 100644 index 0000000..c42e127 --- /dev/null +++ b/io/input/orc_file_splitter.cpp @@ -0,0 +1,164 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef WITH_ORC + +#include "io/input/orc_file_splitter.hpp" + +#include +#include + +#include "boost/utility/string_ref.hpp" +#ifdef WITH_HDFS +#include "hdfs/hdfs.h" +#endif +#include "orc/ColumnPrinter.hh" +#include "orc/OrcFile.hh" +#include "orc/orc-config.hh" + +#include "base/log.hpp" +#include "base/serialization.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "core/coordinator.hpp" +#include "io/input/orc_hdfs_inputstream.hpp" + +namespace orc { + +class SQLColumnPrinter : public ColumnPrinter { + public: + SQLColumnPrinter(std::string& buffer, const Type& type) : ColumnPrinter(buffer) { + for (unsigned int i = 0; i < type.getSubtypeCount(); ++i) { + // fieldNames.push_back(type.getFieldName(i)); + fieldPrinter.push_back(createColumnPrinter(buffer, type.getSubtype(i)).release()); + } + } + + virtual ~SQLColumnPrinter() { + for (size_t i = 0; i < fieldPrinter.size(); ++i) { + delete fieldPrinter[i]; + } + } + + void printRow(uint64_t rowId) override { + if (hasNulls && !notNull[rowId]) { + writeString(buffer, "null"); + } else { + // writeChar(buffer, '{'); + for (unsigned int i = 0; i < fieldPrinter.size(); ++i) { + if (i != 0) { + writeString(buffer, "\t"); + } + // writeChar(buffer, '"'); + // writeString(buffer, fieldNames[i].c_str()); + // writeString(buffer, "\": "); + fieldPrinter[i]->printRow(rowId); + } + // writeChar(buffer, '}'); + } + } + void reset(const ColumnVectorBatch& batch) override { + ColumnPrinter::reset(batch); + const StructVectorBatch& structBatch = dynamic_cast(batch); + for (size_t i = 0; i < fieldPrinter.size(); ++i) { + fieldPrinter[i]->reset(*(structBatch.fields[i])); + } + } + + private: + void writeChar(std::string& file, char ch) { file += ch; } + + void writeString(std::string& file, const char* ptr) { + size_t len = strlen(ptr); + file.append(ptr, len); + } + + std::vector fieldPrinter; +}; + +} // namespace orc + +namespace husky { +namespace io { + +using orc::ColumnVectorBatch; +using orc::createReader; +using orc::ReaderOptions; +using orc::readLocalFile; +using orc::SQLColumnPrinter; + +// default number of lines in one read operation +// size_t ORCFileSplitter::row_batch_size = 8 * 1024; + +ORCFileSplitter::ORCFileSplitter() {} + +ORCFileSplitter::~ORCFileSplitter() { hdfsDisconnect(fs_); } +// initialize reader with the file url +void ORCFileSplitter::load(std::string url) { + cur_fn_ = ""; + url_ = url; + + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, husky::Context::get_param("hdfs_namenode").c_str()); + hdfsBuilderSetNameNodePort(builder, std::stoi(husky::Context::get_param("hdfs_namenode_port"))); + fs_ = hdfsBuilderConnect(builder); + hdfsFreeBuilder(builder); +} + +// ask master for offset and url +boost::string_ref ORCFileSplitter::fetch_batch() { + BinStream question; + question << url_; + BinStream answer = husky::Context::get_coordinator()->ask_master(question, husky::TYPE_ORC_BLK_REQ); + std::string fn; + size_t offset; + answer >> fn; + answer >> offset; + if (fn == "") { + return ""; + } else if (fn != cur_fn_) { + cur_fn_ = fn; + ReaderOptions opts; + reader_ = createReader(read_hdfs_file(fs_, cur_fn_), opts); + } + return read_by_batch(offset); +} + +boost::string_ref ORCFileSplitter::read_by_batch(size_t offset) { + buffer_.clear(); + try { + std::string line = ""; + reader_->seekToRow(offset); + std::unique_ptr printer(new SQLColumnPrinter(line, reader_->getSelectedType())); + std::unique_ptr batch = reader_->createRowBatch(kOrcRowBatchSize); + + if (reader_->next(*batch)) { + printer->reset(*batch); + for (unsigned int i = 0; i < batch->numElements; ++i) { + line.clear(); + printer->printRow(i); + line += "\n"; + buffer_ += line; + } + } + } catch (const std::exception& e) { + LOG_I << e.what(); + } + return boost::string_ref(buffer_); +} + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_file_splitter.hpp b/io/input/orc_file_splitter.hpp new file mode 100644 index 0000000..648463f --- /dev/null +++ b/io/input/orc_file_splitter.hpp @@ -0,0 +1,55 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef WITH_ORC + +#include + +#include "boost/utility/string_ref.hpp" +#include "hdfs/hdfs.h" +#include "orc/OrcFile.hh" + +#include "io/input/file_splitter_base.hpp" + +namespace husky { +namespace io { + +class ORCFileSplitter { + public: + ORCFileSplitter(); + virtual ~ORCFileSplitter(); + + // intialize the url of the orc file + virtual void load(std::string url); + boost::string_ref fetch_batch(); + + protected: + boost::string_ref read_by_batch(size_t offset); + std::string buffer_; + // url may be a directory or a file + std::string url_; + // current filename + std::string cur_fn_; + // orc reader to help to read orc files + std::unique_ptr reader_; + + hdfsFS fs_; +}; + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_hdfs_inputstream.hpp b/io/input/orc_hdfs_inputstream.hpp new file mode 100644 index 0000000..fcd11f3 --- /dev/null +++ b/io/input/orc_hdfs_inputstream.hpp @@ -0,0 +1,92 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#ifdef WITH_ORC + +#include +#include + +#include "hdfs/hdfs.h" +#include "orc/OrcFile.hh" + +#include "base/exception.hpp" +#include "base/log.hpp" +#include "base/thread_support.hpp" + +namespace husky { +namespace io { + +using husky::base::HuskyException; + +const int kOrcRowBatchSize = 5000; + +class HDFSFileInputStream final : public orc::InputStream { + public: + HDFSFileInputStream(hdfsFS hdfs_fs, const std::string& file) { + hdfs_fs_ = hdfs_fs; + file_name_ = file; + hdfs_file_ = hdfsOpenFile(hdfs_fs_, file_name_.c_str(), O_RDONLY, 0, 0, 0); + assert(hdfs_file_ != NULL); + hdfsFileInfo* file_info = hdfsGetPathInfo(hdfs_fs_, file_name_.c_str()); + length_ = file_info->mSize; + hdfsFreeFileInfo(file_info, 1); + } + + ~HDFSFileInputStream() { hdfsCloseFile(hdfs_fs_, hdfs_file_); } + + uint64_t getLength() const override { return static_cast(length_); } + + uint64_t getNaturalReadSize() const override { return 128 * 1024; } + + void read(void* buf, uint64_t length, uint64_t offset) override { + if (!buf) + throw HuskyException("Buffer is null"); + + hdfsSeek(hdfs_fs_, hdfs_file_, offset); + int32_t remain = static_cast(length); + int32_t start = 0; + int32_t nbytes = 0; + while (remain > 0) { + // only 128KB per hdfsRead + nbytes = hdfsRead(hdfs_fs_, hdfs_file_, buf + start, remain); + start += nbytes; + remain -= nbytes; + } + + if (start == -1) + throw HuskyException("Bad read of " + file_name_); + if (static_cast(start) != length) + throw HuskyException("Short read of " + file_name_); + } + + const std::string& getName() const override { return file_name_; } + + private: + std::string file_name_; + hdfsFile hdfs_file_; + hdfsFS hdfs_fs_; + int64_t length_; + std::mutex read_mutex; +}; + +std::unique_ptr read_hdfs_file(hdfsFS hdfs_fs, const std::string& path) { + return std::unique_ptr(new HDFSFileInputStream(hdfs_fs, path)); +} + +} // namespace io +} // namespace husky + +#endif diff --git a/io/input/orc_inputformat.cpp b/io/input/orc_inputformat.cpp new file mode 100644 index 0000000..dcf8451 --- /dev/null +++ b/io/input/orc_inputformat.cpp @@ -0,0 +1,81 @@ + +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "io/input/orc_inputformat.hpp" + +#include +#include + +#include "boost/utility/string_ref.hpp" + +#include "base/log.hpp" +#include "io/input/inputformat_helper.hpp" + +namespace husky { +namespace io { + +enum ORCInputFormatSetUp { + NotSetUp = 0, + InputSetUp = 1 << 1, + AllSetUp = InputSetUp, +}; + +ORCInputFormat::ORCInputFormat() { is_setup_ = ORCInputFormatSetUp::NotSetUp; } + +bool ORCInputFormat::is_setup() const { return !(is_setup_ ^ ORCInputFormatSetUp::AllSetUp); } + +void ORCInputFormat::set_input(const std::string& url) { + if (!url_.empty() && url_ == url) + // Setting with a same url last time will do nothing. + return; + url_ = url; + int prefix = url_.find("://"); + ASSERT_MSG(prefix != std::string::npos, ("Cannot analyze protocol from " + url_).c_str()); + std::string protocol = url_.substr(0, prefix); + splitter_.load(url_.substr(prefix + 3)); + is_setup_ |= ORCInputFormatSetUp::InputSetUp; +} + +// buffer_ got from the orc_splitter must be '\n' seperated lines +// this saves us a lot of block handling +bool ORCInputFormat::next(boost::string_ref& ref) { + if (buffer_.empty() || r == buffer_.size() - 1) { + clear_buffer(); + bool success = fetch_new_batch(); + if (success == false) { + return false; + } + } + r = helper::find_next(buffer_, l, '\n'); + ref = buffer_.substr(l, r - l); + l = helper::find_next(buffer_, r, '\n') + 1; + return true; +} + +bool ORCInputFormat::fetch_new_batch() { + buffer_ = splitter_.fetch_batch(); + if (buffer_.empty()) { + return false; + } + return true; +} + +void ORCInputFormat::clear_buffer() { + buffer_.clear(); + l = r = 0; +} + +} // namespace io +} // namespace husky diff --git a/io/input/orc_inputformat.hpp b/io/input/orc_inputformat.hpp new file mode 100644 index 0000000..34009af --- /dev/null +++ b/io/input/orc_inputformat.hpp @@ -0,0 +1,52 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "boost/utility/string_ref.hpp" + +#include "io/input/inputformat_base.hpp" +#include "io/input/orc_file_splitter.hpp" + +namespace husky { +namespace io { + +class ORCInputFormat : public InputFormatBase { + public: + typedef boost::string_ref RecordT; + + ORCInputFormat(); + virtual ~ORCInputFormat() = default; + + virtual void set_input(const std::string& url); + virtual bool next(boost::string_ref& ref); + virtual bool is_setup() const; + + protected: + bool fetch_new_batch(); + void clear_buffer(); + std::string url_; + + int l = 0; + int r = 0; + boost::string_ref buffer_; + ORCFileSplitter splitter_; +}; + +} // namespace io +} // namespace husky diff --git a/master/CMakeLists.txt b/master/CMakeLists.txt index ebf0da8..46976d6 100644 --- a/master/CMakeLists.txt +++ b/master/CMakeLists.txt @@ -25,6 +25,10 @@ if(MONGOCLIENT_FOUND) list(APPEND master_plugins mongodb_assigner.cpp) endif(MONGOCLIENT_FOUND) +if(ORC_FOUND) + list(APPEND master_plugins orc_assigner.cpp) +endif(ORC_FOUND) + add_library(husky-master-objs OBJECT master.cpp ${master_plugins}) husky_default_properties(husky-master-objs) diff --git a/master/orc_assigner.cpp b/master/orc_assigner.cpp new file mode 100644 index 0000000..1be9755 --- /dev/null +++ b/master/orc_assigner.cpp @@ -0,0 +1,152 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifdef WITH_ORC + +#include "master/orc_assigner.hpp" + +#include +#include +#include + +#include "boost/filesystem.hpp" +#include "orc/OrcFile.hh" + +#include "base/log.hpp" +#include "core/constants.hpp" +#include "core/context.hpp" +#include "io/input/orc_hdfs_inputstream.hpp" +#include "master/master.hpp" + +namespace husky { + +using orc::Reader; +using orc::ReaderOptions; +using orc::createReader; +using orc::readLocalFile; +using orc::ColumnVectorBatch; + +static ORCBlockAssigner orc_block_assigner; + +ORCBlockAssigner::ORCBlockAssigner() { + Master::get_instance().register_main_handler(TYPE_ORC_BLK_REQ, + std::bind(&ORCBlockAssigner::master_main_handler, this)); + Master::get_instance().register_setup_handler(std::bind(&ORCBlockAssigner::master_setup_handler, this)); +} + +void ORCBlockAssigner::master_main_handler() { + auto& master = Master::get_instance(); + auto resp_socket = master.get_socket(); + std::string url; + BinStream stream = zmq_recv_binstream(resp_socket.get()); + stream >> url; + + std::pair ret = answer(url); + stream.clear(); + stream << ret.first << ret.second; + + zmq_sendmore_string(resp_socket.get(), master.get_cur_client()); + zmq_sendmore_dummy(resp_socket.get()); + zmq_send_binstream(resp_socket.get(), stream); + + LOG_I << " => " << ret.first << "@" << ret.second; +} + +void ORCBlockAssigner::master_setup_handler() { + init_hdfs(Context::get_param("hdfs_namenode"), Context::get_param("hdfs_namenode_port")); + num_workers_alive = Context::get_num_workers(); +} + +void ORCBlockAssigner::init_hdfs(const std::string& node, const std::string& port) { + int num_retries = 3; + while (num_retries--) { + struct hdfsBuilder* builder = hdfsNewBuilder(); + hdfsBuilderSetNameNode(builder, node.c_str()); + hdfsBuilderSetNameNodePort(builder, std::stoi(port)); + fs_ = hdfsBuilderConnect(builder); + hdfsFreeBuilder(builder); + if (fs_) + break; + } + if (fs_) { + return; + } + LOG_I << "Failed to connect to HDFS " << node << ":" << port; +} + +void ORCBlockAssigner::browse_hdfs(const std::string& url) { + if (!fs_) + return; + + try { + int num_files; + size_t total = 0; + auto& files = files_dict[url]; + hdfsFileInfo* file_info = hdfsListDirectory(fs_, url.c_str(), &num_files); + for (int i = 0; i < num_files; ++i) { + // for every file in a directory + if (file_info[i].mKind != kObjectKindFile) + continue; + ReaderOptions opts; + reader = createReader(io::read_hdfs_file(fs_, file_info[i].mName), opts); + size_t num_rows = reader->getNumberOfRows(); + files.push_back(OrcFileDesc{std::string(file_info[i].mName) + '\0', num_rows, 0}); + total += num_rows; + } + LOG_I << "Total num of rows: " << total; + hdfsFreeFileInfo(file_info, num_files); + } catch (const std::exception& ex) { + LOG_I << "Exception cought: " << ex.what(); + } +} + +/** + * @return selected_file, offset + */ +std::pair ORCBlockAssigner::answer(std::string& url) { + if (!fs_) { + return {"", 0}; + } + // Directory or file status initialization + // This condition is true either when the begining of the file or + // all the workers has finished reading this file or directory + if (files_dict.find(url) == files_dict.end()) { + browse_hdfs(url); + finish_dict[url] = 0; + } + + // empty url + auto& files = files_dict[url]; + if (files.empty()) { + finish_dict[url] += 1; + if (finish_dict[url] == num_workers_alive) { + files_dict.erase(url); + } + return {"", 0}; + } + + auto& file = files.back(); + std::pair ret = {file.filename, file.offset}; + file.offset += io::kOrcRowBatchSize; + + // remove + if (file.offset > file.num_of_rows) + files.pop_back(); + + return ret; +} + +} // namespace husky + +#endif diff --git a/master/orc_assigner.hpp b/master/orc_assigner.hpp new file mode 100644 index 0000000..9a6a234 --- /dev/null +++ b/master/orc_assigner.hpp @@ -0,0 +1,59 @@ +// Copyright 2016 Husky Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +#pragma once + +#ifdef WITH_ORC + +#include +#include +#include +#include + +#include "orc/OrcFile.hh" + +#include "io/input/orc_hdfs_inputstream.hpp" +#include "master/master.hpp" + +namespace husky { + +struct OrcFileDesc { + std::string filename; + size_t num_of_rows; + size_t offset; +}; + +class ORCBlockAssigner { + public: + ORCBlockAssigner(); + ~ORCBlockAssigner() = default; + + void master_main_handler(); + void master_setup_handler(); + void init_hdfs(const std::string& node, const std::string& port); + void browse_hdfs(const std::string& url); + std::pair answer(std::string& url); + + private: + hdfsFS fs_ = NULL; + // int row_batch_size = 8 * 1024; + int num_workers_alive; + // std::map file_offset; + // std::map file_size; + std::map> files_dict; + std::map finish_dict; + std::unique_ptr reader; +}; +} // namespace husky + +#endif