diff --git a/cpp/src/cylon/CMakeLists.txt b/cpp/src/cylon/CMakeLists.txt index 617141f87..d24d465a2 100644 --- a/cpp/src/cylon/CMakeLists.txt +++ b/cpp/src/cylon/CMakeLists.txt @@ -64,6 +64,7 @@ add_library(cylon SHARED arrow/arrow_types.hpp column.cpp column.hpp + compute/aggregate_kernels.cpp compute/aggregate_kernels.hpp compute/aggregate_utils.hpp compute/aggregates.cpp @@ -102,6 +103,8 @@ add_library(cylon SHARED join/join_utils.hpp join/sort_join.cpp join/sort_join.hpp + mapreduce/mapreduce.hpp + mapreduce/mapreduce.cpp net/channel.hpp net/comm_operations.hpp net/comm_type.hpp diff --git a/cpp/src/cylon/arrow/arrow_comparator.cpp b/cpp/src/cylon/arrow/arrow_comparator.cpp index 333f15633..36ce652da 100644 --- a/cpp/src/cylon/arrow/arrow_comparator.cpp +++ b/cpp/src/cylon/arrow/arrow_comparator.cpp @@ -781,6 +781,25 @@ Status TableRowIndexEqualTo::Make(const std::shared_ptr &table, return Status::OK(); } +Status TableRowIndexEqualTo::Make(const std::vector> &arrays, + std::unique_ptr *out_equal_to) { + auto comps = std::make_shared>>(); + comps->reserve(arrays.size()); + + for (const auto &array: arrays) { + if (array->length() == 0) { + comps->emplace_back(std::make_shared()); + } else { + std::unique_ptr comp; + RETURN_CYLON_STATUS_IF_FAILED(CreateArrayIndexComparator(array, &comp)); + comps->emplace_back(std::move(comp)); + } + } + + *out_equal_to = std::make_unique(std::move(comps)); + return Status::OK(); +} + bool TableRowIndexEqualTo::operator()(const int64_t &record1, const int64_t &record2) const { return std::all_of(comps->begin(), comps->end(), [&](const std::shared_ptr &comp) { @@ -839,10 +858,9 @@ Status TableRowIndexHash::Make(const std::shared_ptr &table, Status TableRowIndexHash::Make(const std::vector> &arrays, std::unique_ptr *hash) { const int64_t len = arrays[0]->length(); - if (std::all_of(arrays.begin() + 1, arrays.end(), [&](const std::shared_ptr &arr) { - return arr->length() == len; - })) { - return {Code::Invalid, "array lengths should be equal"}; + if (std::any_of(arrays.begin() + 1, arrays.end(), + [&](const auto &arr) { return arr->length() != len; })) { + return {Code::Invalid, "TableRowIndexHash array lengths should be equal"}; } auto hashes_ptr = std::make_shared>(len, 0); diff --git a/cpp/src/cylon/arrow/arrow_comparator.hpp b/cpp/src/cylon/arrow/arrow_comparator.hpp index 336073156..0d5dce9d6 100644 --- a/cpp/src/cylon/arrow/arrow_comparator.hpp +++ b/cpp/src/cylon/arrow/arrow_comparator.hpp @@ -175,6 +175,8 @@ class TableRowIndexEqualTo { static Status Make(const std::shared_ptr &table, const std::vector &col_ids, std::unique_ptr *out_equal_to); + static Status Make(const std::vector> &arrays, + std::unique_ptr *out_equal_to); private: // this class gets copied to std container, so we don't want to copy these vectors. diff --git a/cpp/src/cylon/compute/aggregate_kernels.cpp b/cpp/src/cylon/compute/aggregate_kernels.cpp new file mode 100644 index 000000000..655c38cdc --- /dev/null +++ b/cpp/src/cylon/compute/aggregate_kernels.cpp @@ -0,0 +1,38 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "cylon/compute/aggregate_kernels.hpp" + +namespace cylon { +namespace compute { + +std::shared_ptr MakeAggregationOpFromID(AggregationOpId id) { + switch (id) { + case SUM:return std::make_shared(); + case MIN:return std::make_shared(); + case MAX:return std::make_shared(); + case COUNT:return std::make_shared(); + case MEAN:return std::make_shared(); + case VAR:return std::make_shared(); + case STDDEV:return std::make_shared(); + case NUNIQUE:return std::make_shared(); + case QUANTILE:return std::make_shared(); + } + + return nullptr; +} + +} +} \ No newline at end of file diff --git a/cpp/src/cylon/compute/aggregate_kernels.hpp b/cpp/src/cylon/compute/aggregate_kernels.hpp index 81243097a..6244c0cc3 100644 --- a/cpp/src/cylon/compute/aggregate_kernels.hpp +++ b/cpp/src/cylon/compute/aggregate_kernels.hpp @@ -15,10 +15,13 @@ #ifndef CYLON_CPP_SRC_CYLON_COMPUTE_AGGREGATE_KERNELS_HPP_ #define CYLON_CPP_SRC_CYLON_COMPUTE_AGGREGATE_KERNELS_HPP_ +#include #include -#include +#include #include +#include "cylon/util/macros.hpp" + namespace cylon { namespace compute { @@ -56,11 +59,6 @@ struct KernelOptions { virtual ~KernelOptions() = default; }; -/** - * special kernel options holder for default options. i.e. no special options. - */ -struct DefaultKernelOptions : public KernelOptions {}; - /** * Variance kernel options */ @@ -89,36 +87,41 @@ struct QuantileKernelOptions : public KernelOptions { /** * 2. Aggregation operation */ -struct AggregationOp { - /** - * @param id AggregationOpId - * @param options unique_ptr of options - */ - AggregationOp(AggregationOpId id, std::unique_ptr options) - : id(id), options(std::move(options)) {} +//struct AggregationOp { +// /** +// * @param id AggregationOpId +// * @param options unique_ptr of options +// */ +// AggregationOp(AggregationOpId id, KernelOptions *options) +// : id(id), options(options) {} +// +// /** +// * Constructor with uninitialized options. This would be explicitly handled in impl +// * @param id +// */ +// explicit AggregationOp(AggregationOpId id) : id(id), options(nullptr) {} +// +// virtual ~AggregationOp() = default; +// +// AggregationOpId id; +// KernelOptions* options; +//}; - /** - * Constructor with uninitialized options. This would be explicitly handled in impl - * @param id - */ - explicit AggregationOp(AggregationOpId id) : id(id), options(nullptr) {} - - virtual ~AggregationOp() = default; - - AggregationOpId id; - std::unique_ptr options; +struct AggregationOp { + virtual AggregationOpId id() const = 0; + virtual KernelOptions *options() const { return nullptr; }; }; /** - * Base aggregation operation with DefaultKernelOptions + * Base aggregation operation with KernelOptions * @tparam ID AggregationOpId */ template struct BaseAggregationOp : public AggregationOp { - BaseAggregationOp() : AggregationOp(ID, std::make_unique()) {} + AggregationOpId id() const override { return ID; } - static inline std::unique_ptr Make() { - return std::make_unique>(); + static std::shared_ptr Make() { + return std::make_shared>(); } }; @@ -136,21 +139,30 @@ struct NUniqueOp : public BaseAggregationOp {}; * Var op */ struct VarOp : public AggregationOp { + std::shared_ptr opt; + /** * @param ddof delta degree of freedom */ - explicit VarOp(int ddof) : AggregationOp(VAR, std::make_unique(ddof)) {} + explicit VarOp(int ddof = 1) : opt(std::make_shared(ddof)) {} + explicit VarOp(const std::shared_ptr &opt) + : opt(std::static_pointer_cast(opt)) {} - static inline std::unique_ptr Make(int ddof = 0) { - return std::make_unique(ddof); + AggregationOpId id() const override { return VAR; } + KernelOptions *options() const override { return opt.get(); } + + static std::shared_ptr Make(int ddof = 1) { + return std::make_shared(ddof); } }; -struct StdDevOp : public AggregationOp { - explicit StdDevOp(int ddof) : AggregationOp(STDDEV, std::make_unique(ddof)) {} +struct StdDevOp : public VarOp { + explicit StdDevOp(int ddof = 1) : VarOp(ddof) {} + explicit StdDevOp(const std::shared_ptr &opt) : VarOp(opt) {} + AggregationOpId id() const override { return STDDEV; } - static inline std::unique_ptr Make(int ddof = 0) { - return std::make_unique(ddof); + static std::shared_ptr Make(int ddof = 1) { + return std::make_shared(ddof); } }; @@ -158,18 +170,26 @@ struct StdDevOp : public AggregationOp { * Var op */ struct QuantileOp : public AggregationOp { + std::shared_ptr opt; + /** * @param quantile */ - explicit QuantileOp(double quantile) : AggregationOp(QUANTILE, - std::make_unique( - quantile)) {} + explicit QuantileOp(double quantile = 0.5) + : opt(std::make_shared(quantile)) {} + explicit QuantileOp(const std::shared_ptr &opt) + : opt(std::static_pointer_cast(opt)) {} - static inline std::unique_ptr Make(double quantile = 0.5) { - return std::make_unique(quantile); + AggregationOpId id() const override { return QUANTILE; } + KernelOptions *options() const override { return opt.get(); } + + static std::shared_ptr Make(double quantile = 0.5) { + return std::make_shared(quantile); } }; +std::shared_ptr MakeAggregationOpFromID(AggregationOpId id); + // ----------------------------------------------------------------------------- /** @@ -193,7 +213,7 @@ template struct KernelTraits { using State = std::tuple; // using ResultT = T; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "sum_"; } }; @@ -202,7 +222,7 @@ template struct KernelTraits { using State = std::tuple; // using ResultT = T; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "mean_"; } }; @@ -226,7 +246,7 @@ template struct KernelTraits { using State = std::tuple; using ResultT = int64_t; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "count_"; } }; @@ -234,7 +254,7 @@ template struct KernelTraits { using State = std::tuple; using ResultT = T; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "min_"; } }; @@ -242,7 +262,7 @@ template struct KernelTraits { using State = std::tuple; using ResultT = T; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "max_"; } }; @@ -250,7 +270,7 @@ template struct KernelTraits { using State = std::unordered_set; using ResultT = int64_t; - using Options = DefaultKernelOptions; + using Options = KernelOptions; static constexpr const char *name() { return "nunique_"; } }; @@ -350,7 +370,7 @@ struct TypedAggregationKernel : public AggregationKernel { template class MeanKernel : public TypedAggregationKernel, MEAN, T> { public: - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; @@ -415,7 +435,7 @@ class VarianceKernel : public TypedAggregationKernel, VAR, T> */ template struct SumKernel : public TypedAggregationKernel, SUM, T> { - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; inline void KernelInitializeState(std::tuple *state) const { @@ -434,7 +454,7 @@ struct SumKernel : public TypedAggregationKernel, SUM, T> { */ template struct CountKernel : public TypedAggregationKernel, COUNT, T> { - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; inline void KernelInitializeState(std::tuple *state) const { @@ -454,7 +474,7 @@ struct CountKernel : public TypedAggregationKernel, COUNT, T> { */ template struct MinKernel : public TypedAggregationKernel, MIN, T> { - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; inline void KernelInitializeState(std::tuple *state) const { @@ -473,7 +493,7 @@ struct MinKernel : public TypedAggregationKernel, MIN, T> { */ template struct MaxKernel : public TypedAggregationKernel, MAX, T> { - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; inline void KernelInitializeState(std::tuple *state) const { @@ -489,7 +509,7 @@ struct MaxKernel : public TypedAggregationKernel, MAX, T> { template struct NUniqueKernel : public TypedAggregationKernel, NUNIQUE, T> { - void KernelSetup(DefaultKernelOptions *options) { + void KernelSetup(KernelOptions *options) { CYLON_UNUSED(options); }; inline void KernelInitializeState(std::unordered_set *state) const { CYLON_UNUSED(state); } diff --git a/cpp/src/cylon/groupby/hash_groupby.cpp b/cpp/src/cylon/groupby/hash_groupby.cpp index 83172dd02..1e682c772 100644 --- a/cpp/src/cylon/groupby/hash_groupby.cpp +++ b/cpp/src/cylon/groupby/hash_groupby.cpp @@ -149,7 +149,7 @@ template &table, int col_idx, - const compute::AggregationOp &agg_op, + const compute::AggregationOp *agg_op, const std::vector &group_ids, int64_t unique_groups, std::shared_ptr *agg_array, @@ -191,7 +191,7 @@ inline Status aggregate(arrow::MemoryPool *pool, return {Code::Invalid, "unsupported aggregate op"}; } - compute::KernelOptions *options = agg_op.options.get(); + compute::KernelOptions *options = agg_op->options(); if (options != nullptr) { kernel->Setup(options); } else { @@ -237,12 +237,12 @@ template> inline Status resolve_op(arrow::MemoryPool *pool, const std::shared_ptr &table, int col_idx, - const compute::AggregationOp &agg_op, + const compute::AggregationOp *agg_op, const std::vector &group_ids, int64_t unique_groups, std::shared_ptr *agg_array, std::shared_ptr *agg_field) { - switch (agg_op.id) { + switch (agg_op->id()) { case compute::SUM: return aggregate(pool, table, col_idx, agg_op, group_ids, unique_groups, agg_array, agg_field); @@ -277,7 +277,7 @@ inline Status resolve_op(arrow::MemoryPool *pool, inline Status do_aggregate(arrow::MemoryPool *pool, const std::shared_ptr &table, int col_idx, - const compute::AggregationOp &agg_op, + const compute::AggregationOp *agg_op, const std::vector &group_ids, int64_t unique_groups, std::shared_ptr *agg_array, @@ -377,8 +377,8 @@ Status HashGroupBy(const std::shared_ptr &table, for (auto &&p: aggregations) { std::shared_ptr new_arr; std::shared_ptr new_field; - RETURN_CYLON_STATUS_IF_FAILED(do_aggregate(pool, atable, p.first, *p.second, group_ids, - unique_groups, &new_arr, &new_field)); + RETURN_CYLON_STATUS_IF_FAILED(do_aggregate(pool, atable, p.first, p.second.get(), + group_ids, unique_groups, &new_arr, &new_field)); new_arrays.push_back(std::make_shared(std::move(new_arr))); new_fields.push_back(std::move(new_field)); } @@ -405,7 +405,7 @@ Status HashGroupBy(const std::shared_ptr
&table, aggregations.reserve(aggregate_cols.size()); for (auto &&p:aggregate_cols) { // create AggregationOp with nullptr options - aggregations.emplace_back(p.first, std::make_shared(p.second)); + aggregations.emplace_back(p.first, compute::MakeAggregationOpFromID(p.second)); } return HashGroupBy(table, idx_cols, aggregations, output); @@ -423,7 +423,8 @@ Status HashGroupBy(std::shared_ptr
&table, std::vector>> aggregations; aggregations.reserve(aggregate_cols.size()); for (size_t i = 0; i < aggregate_cols.size(); i++) { - aggregations.emplace_back(aggregate_cols[i], std::make_shared(aggregate_ops[i])); + aggregations + .emplace_back(aggregate_cols[i], compute::MakeAggregationOpFromID(aggregate_ops[i])); } return HashGroupBy(table, idx_cols, aggregations, output); diff --git a/cpp/src/cylon/join/sort_join.cpp b/cpp/src/cylon/join/sort_join.cpp index 8766c59b7..e5717d8c6 100644 --- a/cpp/src/cylon/join/sort_join.cpp +++ b/cpp/src/cylon/join/sort_join.cpp @@ -379,7 +379,7 @@ Status do_multi_index_sorted_join(const std::shared_ptr &left_tab, auto t22 = std::chrono::high_resolution_clock::now(); - LOG(INFO) << "Combine chunks time : " + LOG(INFO) << "CombineBeforeShuffle chunks time : " << std::chrono::duration_cast(t22 - t11).count(); // create sorter and do index sort diff --git a/cpp/src/cylon/mapreduce/mapreduce.cpp b/cpp/src/cylon/mapreduce/mapreduce.cpp new file mode 100644 index 000000000..e86ff930d --- /dev/null +++ b/cpp/src/cylon/mapreduce/mapreduce.cpp @@ -0,0 +1,1028 @@ +// +// Created by niranda on 11/22/21. +// + +#include +#include "cylon/mapreduce/mapreduce.hpp" +#include "cylon/util/macros.hpp" + +#include +#include + +namespace cylon { +namespace mapred { + +template +void CombineVisit(const std::shared_ptr &value_col, const int64_t *local_group_ids, + Visitor &&visitor) { + using T = typename ArrowT::c_type; + int64_t i = 0; + arrow::VisitArrayDataInline(*value_col->data(), + [&](const T &val) { + int64_t gid = local_group_ids[i]; + visitor(val, gid); + i++; + }, + [&]() { i++; }); +} + +Status AllocateArray(arrow::MemoryPool *pool, const std::shared_ptr &type, + int64_t length, std::shared_ptr *array) { + std::unique_ptr builder; + RETURN_CYLON_STATUS_IF_ARROW_FAILED(arrow::MakeBuilder(pool, type, &builder)); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(builder->AppendEmptyValues(length)); + CYLON_ASSIGN_OR_RAISE(*array, builder->Finish()) + return Status::OK(); +} + +Status AllocateArrays(arrow::MemoryPool *pool, const arrow::DataTypeVector &intermediate_types, + int64_t length, arrow::ArrayVector *arrays) { + arrays->clear(); + arrays->reserve(intermediate_types.size()); + for (const auto &type: intermediate_types) { + std::shared_ptr arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool, type, length, &arr)); + arrays->push_back(std::move(arr)); + } + return Status::OK(); +} + +size_t MapReduceKernel::num_arrays() const { return intermediate_types().size(); } + +template class CombineOp> +struct MapReduceKernelImpl1D : public MapReduceKernel { + using T = typename ArrowT::c_type; + + public: + explicit MapReduceKernelImpl1D(const std::shared_ptr &in_type) + : out_types({in_type}) {} + + const std::shared_ptr &output_type() const override { return out_types[0]; } + const arrow::DataTypeVector &intermediate_types() const override { return out_types; } + + void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) override { + CYLON_UNUSED(options); + this->pool_ = pool; + } + + Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const override { + std::shared_ptr arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, out_types[0], local_num_groups, &arr)); + + auto *g_ids = local_group_ids->data()->template GetValues(1); + T *res = arr->data()->template GetMutableValues(1); + CombineVisit(value_col, g_ids, + [&](const T &val, int64_t gid) { + res[gid] = CombineOp::Call(res[gid], val); + }); + + *combined_results = {std::move(arr)}; + return Status::OK(); + } + + Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const override { + CYLON_UNUSED(local_group_indices); + return CombineLocally(combined_results[0], local_group_ids, + local_num_groups, reduced_results); + } + + Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const override { + *output = combined_results[0]; + return Status::OK(); + } + + protected: + const arrow::DataTypeVector out_types; + arrow::MemoryPool *pool_ = nullptr; +}; + +template +struct SumFunction { + static T Call(const T &x, const T &y) { return x + y; }; +}; +template +struct SumKernelImpl : public MapReduceKernelImpl1D { + explicit SumKernelImpl(const std::shared_ptr &type) + : MapReduceKernelImpl1D(type) {} + std::string name() const override { return "sum"; } +}; + +template +struct MinFunction { + static T Call(const T &x, const T &y) { return std::min(x, y); }; +}; +template +struct MinKernelImpl : public MapReduceKernelImpl1D { + explicit MinKernelImpl(const std::shared_ptr &type) + : MapReduceKernelImpl1D(type) {} + std::string name() const override { return "min"; } +}; + +template +struct MaxFunction { + static T Call(const T &x, const T &y) { return std::max(x, y); }; +}; +template +struct MaxKernelImpl : public MapReduceKernelImpl1D { + explicit MaxKernelImpl(const std::shared_ptr &type) + : MapReduceKernelImpl1D(type) {} + std::string name() const override { return "max"; } +}; + +struct CountKernelImpl : public MapReduceKernel { + const arrow::DataTypeVector out_types{arrow::int64()}; + arrow::MemoryPool *pool_ = nullptr; + + std::string name() const override { return "count"; } + const std::shared_ptr &output_type() const override { return out_types[0]; } + const arrow::DataTypeVector &intermediate_types() const override { return out_types; } + + void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) override { + CYLON_UNUSED(options); + this->pool_ = pool; + } + + Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const override { + std::shared_ptr arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, out_types[0], local_num_groups, &arr)); + + auto *g_ids = local_group_ids->data()->template GetValues(1); + auto *counts = arr->data()->template GetMutableValues(1); + for (int64_t i = 0; i < value_col->length(); i++) { + counts[g_ids[i]]++; + } + + *combined_results = {std::move(arr)}; + return Status::OK(); + } + + Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const override { + CYLON_UNUSED(local_group_indices); + std::shared_ptr arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, out_types[0], local_num_groups, &arr)); + + auto *g_ids = local_group_ids->data()->template GetValues(1); + auto *res = arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[0], g_ids, + [&](const int64_t &val, int64_t gid) { + res[gid] += val; + }); + *reduced_results = {std::move(arr)}; + return Status::OK(); + } + + Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const override { + *output = combined_results[0]; + return Status::OK(); + } +}; + +template +struct MeanKernelImpl : public MapReduceKernel { + using T = typename ArrowT::c_type; + + public: + explicit MeanKernelImpl(const std::shared_ptr &in_type) + : inter_types({in_type, arrow::int64()}) {} + + std::string name() const override { return "mean"; } + const std::shared_ptr &output_type() const override { return inter_types[0]; } + const arrow::DataTypeVector &intermediate_types() const override { return inter_types; } + + void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) override { + CYLON_UNUSED(options); + this->pool_ = pool; + } + + Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const override { + RETURN_CYLON_STATUS_IF_FAILED(AllocateArrays(pool_, inter_types, local_num_groups, + combined_results)); + + auto *g_ids = local_group_ids->data()->template GetValues(1); + T *sums = (*combined_results)[0]->data()->template GetMutableValues(1); + auto *counts = (*combined_results)[1]->data()->template GetMutableValues(1); + + CombineVisit(value_col, g_ids, + [&](const T &val, int64_t gid) { + sums[gid] += val; + counts[gid]++; + }); + return Status::OK(); + } + + Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const override { + assert(combined_results.size() == num_arrays()); + CYLON_UNUSED(local_group_indices); + auto *g_ids = local_group_ids->data()->template GetValues(1); + + std::shared_ptr sum_arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, inter_types[0], local_num_groups, &sum_arr)); + auto *sums = sum_arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[0], g_ids, + [&](const T &val, int64_t gid) { sums[gid] += val; }); + + std::shared_ptr cnt_arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, inter_types[1], local_num_groups, &cnt_arr)); + auto *counts = cnt_arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[1], g_ids, + [&](const T &val, int64_t gid) { counts[gid] += val; }); + + *reduced_results = {std::move(sum_arr), std::move(cnt_arr)}; + return Status::OK(); + } + + Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const override { + assert(combined_results.size() == num_arrays()); + + int64_t num_groups = combined_results[0]->length(); + assert(combined_results[1]->length() == num_groups); + + auto *sums = combined_results[0]->data()->template GetMutableValues(1); + const auto *counts = combined_results[1]->data()->template GetValues(1); + + // now make the mean + for (int64_t i = 0; i < num_groups; i++) { + sums[i] = static_cast(sums[i] / counts[i]); + } + + *output = combined_results[0]; + return Status::OK(); + } + + private: + arrow::MemoryPool *pool_ = nullptr; + // {sum_t, count_t} + const arrow::DataTypeVector inter_types; +}; + +template +struct VarKernelImpl : public MapReduceKernel { + using T = typename ArrowT::c_type; + + arrow::MemoryPool *pool_ = nullptr; + int ddof = 0; + // {sum_sq_t, sum_t, count_t} + const arrow::DataTypeVector inter_types{arrow::float64(), arrow::float64(), arrow::int64()}; + + std::string name() const override { return StdDev ? "std" : "mean"; } + const std::shared_ptr &output_type() const override { return inter_types[0]; } + const arrow::DataTypeVector &intermediate_types() const override { return inter_types; } + + void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) override { + this->ddof = reinterpret_cast(options)->ddof; + this->pool_ = pool; + } + + Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const override { + RETURN_CYLON_STATUS_IF_FAILED(AllocateArrays(pool_, inter_types, local_num_groups, + combined_results)); + auto *g_ids = local_group_ids->data()->template GetValues(1); + auto *sq_sums = (*combined_results)[0]->data()->template GetMutableValues(1); + auto *sums = (*combined_results)[1]->data()->template GetMutableValues(1); + auto *counts = (*combined_results)[2]->data()->template GetMutableValues(1); + + CombineVisit(value_col, g_ids, + [&](const T &val, int64_t gid) { + sq_sums[gid] += (double) (val * val); + sums[gid] += (double) (val); + counts[gid]++; + }); + return Status::OK(); + } + + Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const override { + assert(combined_results.size() == num_arrays()); + assert(combined_results[0]->type() == inter_types[0]); + assert(combined_results[1]->type() == inter_types[1]); + assert(combined_results[2]->type() == inter_types[2]); + + CYLON_UNUSED(local_group_indices); + auto *g_ids = local_group_ids->data()->template GetValues(1); + + std::shared_ptr sq_sum_arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, inter_types[0], local_num_groups, + &sq_sum_arr)); + auto *sq_sums = sq_sum_arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[0], g_ids, + [&](const double &val, int64_t gid) { sq_sums[gid] += val; }); + + std::shared_ptr sum_arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, inter_types[1], local_num_groups, &sum_arr)); + auto *sums = sum_arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[1], g_ids, + [&](const double &val, int64_t gid) { sums[gid] += val; }); + + std::shared_ptr cnt_arr; + RETURN_CYLON_STATUS_IF_FAILED(AllocateArray(pool_, inter_types[2], local_num_groups, &cnt_arr)); + auto *counts = cnt_arr->data()->template GetMutableValues(1); + CombineVisit(combined_results[2], g_ids, + [&](const int64_t &val, int64_t gid) { counts[gid] += val; }); + + *reduced_results = {std::move(sq_sum_arr), std::move(sum_arr), std::move(cnt_arr)}; + return Status::OK(); + } + + Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const override { + assert(combined_results.size() == num_arrays()); + + int64_t num_groups = combined_results[0]->length(); + assert(combined_results[1]->length() == num_groups); + assert(combined_results[2]->length() == num_groups); + + auto *sq_sums = combined_results[0]->data()->template GetMutableValues(1); + auto *sums = combined_results[1]->data()->template GetValues(1); + auto *counts = combined_results[2]->data()->template GetValues(1); + + // now make the mean + int64_t i = 0; + for (; i < num_groups; i++) { + auto count = static_cast(counts[i]); + double sq_sum = sq_sums[i]; + double sum = sums[i]; + if (count > 0) { + double mean = sum / count; + double mean_sum_sq = sq_sum / count; + double var = sq_sum * (mean_sum_sq - mean * mean) / (count - ddof); + sq_sums[i] = StdDev ? sqrt(var) : var; + } else { + break; + } + } + + if (i != num_groups) { + return {Code::ExecutionError, + "error occurred during var finalize. idx: " + std::to_string(i)}; + } + *output = combined_results[0]; + return Status::OK(); + } +}; + +// todo: to be supported with arrow v6.0+ +/*struct NuniqueKernelImpl : public MapReduceKernel { + const arrow::DataTypeVector out_types{arrow::int64()}; + arrow::MemoryPool *pool_ = nullptr; + + std::string name() const override { return "nunique"; } + const std::shared_ptr &output_type() const override { return out_types[0]; } + const arrow::DataTypeVector &intermediate_types() const override { return out_types; } + + void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) override { + CYLON_UNUSED(options); + this->pool_ = pool; + } + + Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const override { + CYLON_UNUSED(local_group_ids); + CYLON_UNUSED(local_num_groups); + *combined_results = {value_col}; + return Status::OK(); + } + + Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const override { + CYLON_UNUSED(combined_results); + CYLON_UNUSED(local_group_ids); + CYLON_UNUSED(local_group_indices); + CYLON_UNUSED(local_num_groups); + CYLON_UNUSED(reduced_results); + return {Code::ExecutionError, "Nunique does not support ReduceShuffledResults"}; + } + + Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const override { + *output = combined_results[0]; + return Status::OK(); + } +};*/ + +Status MapToGroupKernel::Map(const arrow::ArrayVector &arrays, + std::shared_ptr *local_group_ids, + std::shared_ptr *local_group_indices, + int64_t *local_num_groups) const { + const int64_t num_rows = arrays[0]->length(); + if (std::any_of(arrays.begin() + 1, arrays.end(), + [&](const auto &arr) { return arr->length() != num_rows; })) { + return {Code::Invalid, "array lengths should be the same"}; + } + + // if empty, return + if (num_rows == 0) { + CYLON_ASSIGN_OR_RAISE(*local_group_ids, arrow::MakeArrayOfNull(arrow::int64(), 0, pool_)) + *local_group_indices = *local_group_ids; // copy empty array + *local_num_groups = 0; + return Status::OK(); + } + + std::unique_ptr comp; + RETURN_CYLON_STATUS_IF_FAILED(TableRowIndexEqualTo::Make(arrays, &comp)); + + std::unique_ptr hash; + RETURN_CYLON_STATUS_IF_FAILED(TableRowIndexHash::Make(arrays, &hash)); + + ska::bytell_hash_map + hash_map(num_rows, *hash, *comp); + + arrow::Int64Builder group_ids_build(pool_); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(group_ids_build.Reserve(num_rows)); + + arrow::Int64Builder filter_build(pool_); + RETURN_CYLON_STATUS_IF_ARROW_FAILED((filter_build.Reserve(num_rows))); + + int64_t unique = 0; + for (int64_t i = 0; i < num_rows; i++) { + const auto &res = hash_map.emplace(i, unique); + if (res.second) { // this was a unique group + group_ids_build.UnsafeAppend(unique); + unique++; + filter_build.UnsafeAppend(i); + } else { + group_ids_build.UnsafeAppend(res.first->second); + } + } + + RETURN_CYLON_STATUS_IF_ARROW_FAILED((group_ids_build.Finish(local_group_ids))); + RETURN_CYLON_STATUS_IF_ARROW_FAILED((filter_build.Finish(local_group_indices))); + *local_num_groups = unique; + return Status::OK(); +} + +Status MapToGroupKernel::Map(const std::shared_ptr &table, + const std::vector &key_cols, + std::shared_ptr *local_group_ids, + std::shared_ptr *local_group_indices, + int64_t *local_num_groups) const { + arrow::ArrayVector arrays; + arrays.reserve(key_cols.size()); + + for (int i: key_cols) { + const auto &col = table->column(i); + if (col->num_chunks() > 1) { + return {Code::Invalid, "MapToGroupKernel doesnt support chunks"}; + } + arrays.push_back(col->chunk(0)); + } + + return Map(arrays, local_group_ids, local_group_indices, local_num_groups); +} + +template +std::unique_ptr MakeMapReduceKernelImpl( + const std::shared_ptr &type, compute::AggregationOpId reduce_op) { + switch (reduce_op) { + case compute::SUM: return std::make_unique>(type); + case compute::MIN:return std::make_unique>(type); + case compute::MAX:return std::make_unique>(type); + case compute::COUNT:return std::make_unique(); + case compute::MEAN:return std::make_unique>(type); + case compute::VAR: return std::make_unique>(); + case compute::STDDEV:return std::make_unique>(); + case compute::NUNIQUE:break; + case compute::QUANTILE:break; + } + return nullptr; +} + +std::unique_ptr MakeMapReduceKernel(const std::shared_ptr &type, + compute::AggregationOpId reduce_op) { + switch (type->id()) { + case arrow::Type::NA:break; + case arrow::Type::BOOL:break; + case arrow::Type::UINT8: return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::INT8:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::UINT16:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::INT16:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::UINT32:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::INT32:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::UINT64:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::INT64:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::FLOAT:return MakeMapReduceKernelImpl(type, reduce_op);; + case arrow::Type::DOUBLE:return MakeMapReduceKernelImpl(type, reduce_op);; + case arrow::Type::DATE32:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::DATE64:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::TIMESTAMP: + return MakeMapReduceKernelImpl(type, + reduce_op); + case arrow::Type::TIME32:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::TIME64:return MakeMapReduceKernelImpl(type, reduce_op); + case arrow::Type::HALF_FLOAT:break; + case arrow::Type::STRING:break; + case arrow::Type::LARGE_STRING:break; + case arrow::Type::BINARY:break; + case arrow::Type::LARGE_BINARY:break; + case arrow::Type::FIXED_SIZE_BINARY:break; + default: break; + } + return nullptr; +} + +using AggKernelVector = std::vector>>; + +Status MakeAggKernels(arrow::MemoryPool *pool, + const std::shared_ptr &schema, + const AggOpVector &aggs, + AggKernelVector *agg_kernels) { + agg_kernels->reserve(aggs.size()); + for (const auto &agg: aggs) { + const auto &type = schema->field(agg.first)->type(); + auto kern = MakeMapReduceKernel(type, agg.second->id()); + + if (kern) { + // initialize kernel + kern->Init(pool, agg.second->options()); + agg_kernels->emplace_back(agg.first, std::move(kern)); + } else { + return {Code::NotImplemented, "Unsupported reduce kernel type " + type->ToString()}; + } + } + return Status::OK(); +} + +Status MakeOutputSchema(const std::shared_ptr &cur_schema, + const std::vector &key_cols, const AggKernelVector &agg_kernels, + std::shared_ptr *out_schema, + bool use_intermediate_types = false) { + arrow::SchemaBuilder schema_builder; + // add key fields + for (int i: key_cols) { + RETURN_CYLON_STATUS_IF_ARROW_FAILED(schema_builder.AddField(cur_schema->field(i))); + } + + // add agg fields --> out field name = "name" + "op_name" + for (const auto &agg: agg_kernels) { + const auto &kern = agg.second; + if (use_intermediate_types) { + const auto &types = kern->intermediate_types(); + for (size_t i = 0; i < types.size(); i++) { + auto f = arrow::field( + cur_schema->field(agg.first)->name() + "_" + kern->name() + "_" + std::to_string(i), + types[i]); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(schema_builder.AddField(f)); + } + } else { // using output types + auto f = arrow::field(cur_schema->field(agg.first)->name() + "_" + kern->name(), + kern->output_type()); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(schema_builder.AddField(f)); + } + } + + CYLON_ASSIGN_OR_RAISE(*out_schema, schema_builder.Finish()) + return Status::OK(); +} + +Status MapToGroups(arrow::compute::ExecContext *exec_ctx, + const std::shared_ptr &atable, + const std::vector &key_cols, + const MapToGroupKernel *mapper, + std::shared_ptr *group_ids, + std::shared_ptr *group_indices, + int64_t *num_groups, + arrow::ChunkedArrayVector *out_arrays) { + RETURN_CYLON_STATUS_IF_FAILED(mapper->Map(atable, key_cols, group_ids, group_indices, + num_groups)); + assert(*num_groups == (*group_indices)->length()); + assert(atable->num_rows() == (*group_ids)->length()); + + // take group_indices from the columns to create out key columns + for (int k: key_cols) { + CYLON_ASSIGN_OR_RAISE(auto arr, + arrow::compute::Take(atable->column(k)->chunk(0), *group_indices, + arrow::compute::TakeOptions::NoBoundsCheck(), + exec_ctx)) + out_arrays->push_back(std::make_shared(arr.make_array())); + } + + return Status::OK(); +} + +Status LocalAggregate(const std::shared_ptr &ctx, + const std::shared_ptr &atable, + const std::vector &key_cols, + const AggKernelVector &agg_kernels, + std::shared_ptr *output, + const MapToGroupKernel *mapper) { + auto pool = ToArrowPool(ctx); + arrow::compute::ExecContext exec_ctx(pool); + const auto &cur_schema = atable->schema(); + + // make out_schema + std::shared_ptr out_schema; + RETURN_CYLON_STATUS_IF_FAILED(MakeOutputSchema(cur_schema, key_cols, agg_kernels, &out_schema)); + + if (atable->num_rows() == 0) { + // return empty table with proper aggregate types + RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::CreateEmptyTable(out_schema, output, pool)); + return Status::OK(); + } + + std::vector> out_arrays; + out_arrays.reserve(out_schema->num_fields()); + + // map to groups + std::shared_ptr group_ids, group_indices; + int64_t num_groups; + + RETURN_CYLON_STATUS_IF_FAILED(MapToGroups(&exec_ctx, atable, key_cols, mapper, &group_ids, + &group_indices, &num_groups, &out_arrays)); + + // make the value columns --> only use MapReduceKernel.CombineBeforeShuffle, and Finalize + for (const auto &p: agg_kernels) { + int val_col = p.first; + const auto &kern = p.second; + + // val col + if (atable->column(val_col)->num_chunks() > 1) { + return {Code::Invalid, "Aggregates do not support chunks"}; + } + const auto &val_arr = atable->column(val_col)->chunk(0); + + arrow::ArrayVector res; + RETURN_CYLON_STATUS_IF_FAILED( + kern->CombineLocally(val_arr, group_ids, num_groups, &res)); + std::shared_ptr out_arr; + RETURN_CYLON_STATUS_IF_FAILED(kern->Finalize(res, &out_arr)); + + out_arrays.push_back(std::make_shared(std::move(out_arr))); + } + + // check if the types match + assert(out_arrays.size() == (size_t) out_schema->num_fields()); + for (int i = 0; i < out_schema->num_fields(); i++) { + assert(out_schema->field(i)->type()->Equals(*out_arrays[i]->type())); + } + + *output = arrow::Table::Make(std::move(out_schema), std::move(out_arrays)); + return Status::OK(); +} + +Status DistAggregate(const std::shared_ptr &ctx, + const std::shared_ptr &atable, + const std::vector &key_cols, + const AggKernelVector &agg_kernels, + std::shared_ptr *output, + const MapToGroupKernel *mapper) { + auto pool = ToArrowPool(ctx); + arrow::compute::ExecContext exec_ctx(pool); + const auto &cur_schema = atable->schema(); + + // make intermediate_schema + std::shared_ptr interim_schema; + RETURN_CYLON_STATUS_IF_FAILED( + MakeOutputSchema(cur_schema, key_cols, agg_kernels, &interim_schema, true)); + + std::vector> interim_arrays; + interim_arrays.reserve(interim_schema->num_fields()); + + if (atable->num_rows() == 0) { + // if table empty, push empty arrays to the interim_arrays + for (const auto &f: interim_schema->fields()) { + CYLON_ASSIGN_OR_RAISE(auto arr, arrow::MakeArrayOfNull(f->type(), 0, pool)) + interim_arrays.push_back(std::make_shared(std::move(arr))); + } + } else { + // map to groups + std::shared_ptr group_ids, group_indices; + int64_t num_groups; + RETURN_CYLON_STATUS_IF_FAILED(MapToGroups(&exec_ctx, atable, key_cols, mapper, &group_ids, + &group_indices, &num_groups, &interim_arrays)); + + // make the interim value columns --> only use MapReduceKernel.CombineBeforeShuffle + for (const auto &p: agg_kernels) { + int val_col = p.first; + const auto &kern = p.second; + + // val col + if (atable->column(val_col)->num_chunks() > 1) { + return {Code::Invalid, "Aggregates do not support chunks"}; + } + const auto &val_arr = atable->column(val_col)->chunk(0); + + arrow::ArrayVector results; + RETURN_CYLON_STATUS_IF_FAILED( + kern->CombineLocally(val_arr, group_ids, num_groups, &results)); + + for (auto &&arr: results) { + interim_arrays.push_back(std::make_shared(std::move(arr))); + } + } + + // check if the types match + assert(interim_arrays.size() == (size_t) interim_schema->num_fields()); + for (int i = 0; i < interim_schema->num_fields(); i++) { + assert(interim_schema->field(i)->type()->Equals(*interim_arrays[i]->type())); + } + + // clear values + group_ids.reset(); + group_indices.reset(); + } + + // now shuffle the interim results + std::shared_ptr
shuffle_table; + RETURN_CYLON_STATUS_IF_FAILED(Table::FromArrowTable(ctx, + arrow::Table::Make(std::move(interim_schema), + std::move(interim_arrays)), + shuffle_table)); + // key cols are at the front. So, create a new vector + std::vector shuffle_keys(key_cols.size()); + std::iota(shuffle_keys.begin(), shuffle_keys.end(), 0); + // shuffle and update + RETURN_CYLON_STATUS_IF_FAILED(Shuffle(shuffle_table, shuffle_keys, shuffle_table)); + const auto &shuffled_atable = shuffle_table->get_table(); + + // make output schema + std::shared_ptr out_schema; + RETURN_CYLON_STATUS_IF_FAILED(MakeOutputSchema(cur_schema, key_cols, agg_kernels, &out_schema)); + + // now create new set of columns + std::vector> out_arrays; + out_arrays.reserve(out_schema->num_fields()); + + if (shuffled_atable->num_rows() == 0) { + // if table empty, push empty arrays to the out_arrays + for (const auto &f: out_schema->fields()) { + CYLON_ASSIGN_OR_RAISE(auto arr, arrow::MakeArrayOfNull(f->type(), 0, pool)) + out_arrays.push_back(std::make_shared(std::move(arr))); + } + } else { + // map to groups + std::shared_ptr group_ids, group_indices; + int64_t num_groups; + RETURN_CYLON_STATUS_IF_FAILED( + MapToGroups(&exec_ctx, shuffled_atable, shuffle_keys, mapper, + &group_ids, &group_indices, &num_groups, &out_arrays)); + + // make the interim value columns --> only use MapReduceKernel.Init, ReduceAfterShuffle and Finalize + size_t col_offset = shuffle_keys.size(); + for (const auto &p: agg_kernels) { + const auto &kern = p.second; + + // recreate combined columns vector + arrow::ArrayVector combined_results; + combined_results.reserve(kern->num_arrays()); + for (size_t i = 0; i < kern->num_arrays(); i++) { + assert (shuffled_atable->column(col_offset + i)->num_chunks() == 1); + combined_results.push_back(shuffled_atable->column((int) (col_offset + i))->chunk(0)); + } + col_offset += kern->num_arrays(); + + arrow::ArrayVector results; + RETURN_CYLON_STATUS_IF_FAILED( + kern->ReduceShuffledResults(combined_results, group_ids, group_indices, num_groups, + &results)); + std::shared_ptr out_arr; + RETURN_CYLON_STATUS_IF_FAILED(kern->Finalize(results, &out_arr)); + + out_arrays.push_back(std::make_shared(std::move(out_arr))); + } + } + + // check if the types match + assert(out_arrays.size() == (size_t) out_schema->num_fields()); + for (int i = 0; i < out_schema->num_fields(); i++) { + assert(out_schema->field(i)->type()->Equals(*out_arrays[i]->type())); + } + *output = arrow::Table::Make(std::move(out_schema), std::move(out_arrays)); + return Status::OK(); +} + +// todo test this with new kernels +Status ShuffleTable(const std::shared_ptr &ctx, + const std::shared_ptr &atable, + const std::vector &key_cols, const AggKernelVector &agg_kernels, + std::shared_ptr
*shuffled, std::vector *new_key_cols, + AggKernelVector *new_agg_kernels) { + arrow::FieldVector fields; + arrow::ChunkedArrayVector arrays; + + // first take the key columns + for (int k: key_cols) { + arrays.push_back(atable->column(k)); + fields.push_back(atable->field(k)); + } + + new_key_cols->resize(key_cols.size()); + std::iota(new_key_cols->begin(), new_key_cols->end(), 0); + + new_agg_kernels->reserve(agg_kernels.size()); + + std::unordered_map val_cols; // original col_id -> new col_id + for (const auto &agg: agg_kernels) { + int original_col_id = agg.first; + const auto &res = val_cols.emplace(original_col_id, key_cols.size() + val_cols.size()); + + if (res.second) { // this is a unique col_id + arrays.push_back(atable->column(original_col_id)); + fields.push_back(atable->field(original_col_id)); + } + + int new_col_id = res.first->second; + new_agg_kernels->emplace_back(new_col_id, agg.second); + } + + auto shuffling_atable = arrow::Table::Make(std::make_shared(std::move(fields)), + std::move(arrays)); + auto shuffling_table = std::make_shared
(ctx, std::move(shuffling_atable)); + return Shuffle(shuffling_table, *new_key_cols, *shuffled); +} + +// todo test this with new kernels +Status DistAggregateSingleStage(const std::shared_ptr &ctx, + const std::shared_ptr &atable, + const std::vector &key_cols, + const AggKernelVector &agg_kernels, + std::shared_ptr *output, + const MapToGroupKernel *mapper) { + auto pool = ToArrowPool(ctx); + arrow::compute::ExecContext exec_ctx(pool); + + std::vector new_key_cols; + AggKernelVector new_agg_kernels; + std::shared_ptr
shuffled_table; + RETURN_CYLON_STATUS_IF_FAILED(ShuffleTable(ctx, atable, key_cols, agg_kernels, + &shuffled_table, &new_key_cols, &new_agg_kernels)); + const auto &shuffled_atable = shuffled_table->get_table(); + const auto &shuffled_schema = atable->schema(); + + // make output schema + std::shared_ptr out_schema; + RETURN_CYLON_STATUS_IF_FAILED(MakeOutputSchema(shuffled_schema, new_key_cols, new_agg_kernels, + &out_schema)); + + // now create new set of columns + std::vector> out_arrays; + out_arrays.reserve(out_schema->num_fields()); + + if (shuffled_atable->num_rows() == 0) { + // if table empty, push empty arrays to the out_arrays + for (const auto &f: out_schema->fields()) { + CYLON_ASSIGN_OR_RAISE(auto arr, arrow::MakeArrayOfNull(f->type(), 0, pool)) + out_arrays.push_back(std::make_shared(std::move(arr))); + } + } else { + // map to groups + std::shared_ptr group_ids, group_indices; + int64_t num_groups; + RETURN_CYLON_STATUS_IF_FAILED( + MapToGroups(&exec_ctx, shuffled_atable, new_key_cols, mapper, &group_ids, &group_indices, + &num_groups, &out_arrays)); + + // make the interim value columns --> only use MapReduceKernel.CombineLocally and Finalize + for (const auto &new_agg_kernel: new_agg_kernels) { + int val_col = new_agg_kernel.first; + const auto &kern = new_agg_kernel.second; + + assert(kern->num_arrays() == 1); + const auto &val_arr = shuffled_atable->column(val_col)->chunk(0); + + arrow::ArrayVector results; + RETURN_CYLON_STATUS_IF_FAILED(kern->CombineLocally(val_arr, group_ids, num_groups, &results)); + + assert(results.size() == 1); + std::shared_ptr out_arr; + RETURN_CYLON_STATUS_IF_FAILED(kern->Finalize(results, &out_arr)); + + out_arrays.push_back(std::make_shared(std::move(out_arr))); + } + } + + // check if the types match + assert(out_arrays.size() == (size_t) out_schema->num_fields()); + for (int i = 0; i < out_schema->num_fields(); i++) { + assert(out_schema->field(i)->type()->Equals(*out_arrays[i]->type())); + } + *output = arrow::Table::Make(std::move(out_schema), std::move(out_arrays)); + return Status::OK(); +} + +Status MapredHashGroupBy(const std::shared_ptr
&table, const std::vector &key_cols, + const AggOpVector &aggs, std::shared_ptr
*output, + const std::unique_ptr &mapper) { + const auto &ctx = table->GetContext(); + auto pool = ToArrowPool(ctx); + const auto &atable = table->get_table(); + + AggKernelVector agg_kernels; + RETURN_CYLON_STATUS_IF_FAILED(MakeAggKernels(pool, atable->schema(), aggs, &agg_kernels)); + + if (ctx->GetWorldSize() == 1) { // if serial execution, just perform local aggregate. + std::shared_ptr out_table; + RETURN_CYLON_STATUS_IF_FAILED(LocalAggregate(ctx, atable, key_cols, agg_kernels, + &out_table, mapper.get())); + return Table::FromArrowTable(ctx, std::move(out_table), *output); + } + + // distributed execution + + // there are 2 types of distributed kernels. + // single stage - bypass local combine prior to the shuffle + // dual stage - local combine + shuffle + local combine + + // push the single_stage_reduction kernels to the end of the vector + auto single_stage_kernels_start + = std::partition(agg_kernels.begin(), agg_kernels.end(), + [](const std::pair> &k_pair) { + return !k_pair.second->single_stage_reduction(); + }); + + arrow::ChunkedArrayVector final_arrays; + arrow::SchemaBuilder schema_builder; + + size_t dual_stage_kernels = std::distance(agg_kernels.begin(), single_stage_kernels_start); + size_t single_stage_kernels = std::distance(single_stage_kernels_start, agg_kernels.end()); + + if (dual_stage_kernels) { // i.e. there are some dual stage kernels + std::shared_ptr temp_table; + AggKernelVector kernels; + std::move(agg_kernels.begin(), single_stage_kernels_start, std::back_inserter(kernels)); + + RETURN_CYLON_STATUS_IF_FAILED(DistAggregate(ctx, atable, key_cols, kernels, + &temp_table, mapper.get())); + + if (single_stage_kernels == 0) { // no single stage kernels, return temp_table + return Table::FromArrowTable(ctx, std::move(temp_table), *output); + } else { + final_arrays = temp_table->columns(); // copy columns vector + RETURN_CYLON_STATUS_IF_ARROW_FAILED(schema_builder.AddSchema(temp_table->schema())); + } + } + + if (single_stage_kernels_start != agg_kernels.end()) { + std::shared_ptr temp_table; + AggKernelVector kernels; + std::move(single_stage_kernels_start, agg_kernels.end(), std::back_inserter(kernels)); + RETURN_CYLON_STATUS_IF_FAILED(DistAggregateSingleStage(ctx, atable, key_cols, kernels, + &temp_table, mapper.get())); + + if (dual_stage_kernels == 0) { // there are only single_stage_kernels + return Table::FromArrowTable(ctx, std::move(temp_table), *output); + } else { + assert(final_arrays[0]->length() == temp_table->num_rows()); + // skip key columns + for (int i = (int) key_cols.size(); i < temp_table->num_columns(); i++) { + final_arrays.push_back(temp_table->column(i)); + RETURN_CYLON_STATUS_IF_ARROW_FAILED(schema_builder.AddField(temp_table->field(i))); + } + } + } + + CYLON_ASSIGN_OR_RAISE(auto schema, schema_builder.Finish()) + auto out_table = arrow::Table::Make(std::move(schema), std::move(final_arrays)); + return Table::FromArrowTable(ctx, std::move(out_table), *output); +} + +Status MapredHashGroupBy(const std::shared_ptr
&table, const std::vector &key_cols, + const AggOpIdVector &aggs, std::shared_ptr
*output) { + AggOpVector op_vector; + op_vector.reserve(aggs.size()); + + for (auto p: aggs) { + auto op = compute::MakeAggregationOpFromID(p.second); + if (!op) { + return {Code::Invalid, "Unable to create op for op_id " + std::to_string(p.second)}; + } + op_vector.emplace_back(p.first, std::move(op)); + } + return MapredHashGroupBy(table, key_cols, op_vector, output); +} + +} +} diff --git a/cpp/src/cylon/mapreduce/mapreduce.hpp b/cpp/src/cylon/mapreduce/mapreduce.hpp new file mode 100644 index 000000000..ea342c682 --- /dev/null +++ b/cpp/src/cylon/mapreduce/mapreduce.hpp @@ -0,0 +1,147 @@ +// +// Created by niranda on 11/22/21. +// + +#ifndef CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_ +#define CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_ + +#include +#include + +#include "cylon/table.hpp" +#include "cylon/compute/aggregate_kernels.hpp" + +namespace cylon { +namespace mapred { + +struct MapToGroupKernel { + public: + explicit MapToGroupKernel(arrow::MemoryPool *pool = arrow::default_memory_pool()) : pool_(pool) {} + + virtual ~MapToGroupKernel() = default; + + /** + * Map each row to a unique group id in the range [0, local_num_groups). This is a local + * operation. + * @param arrays + * @param local_group_ids unique group id (length = num rows) + * @param local_group_indices index of each unique group id (length = local_num_groups) + * @param local_num_groups number of unique (local) groups + * @return + */ + virtual Status Map(const arrow::ArrayVector &arrays, + std::shared_ptr *local_group_ids, + std::shared_ptr *local_group_indices, + int64_t *local_num_groups) const; + + Status Map(const std::shared_ptr &table, + const std::vector &key_cols, + std::shared_ptr *local_group_ids, + std::shared_ptr *local_group_indices, + int64_t *local_num_groups) const; + + private: + arrow::MemoryPool *pool_; +}; + +/** + * Reduce an array is a distributed fashion. It is done in the following stages. + * 1. MapToGroups: Calculate group_ids for value_col + * 2. CombineLocally: Combine value_col locally based on group_ids (which creates an intermediate array vector) + * 3. Shuffle: Shuffle a temp table with intermediate results + * 4. MapToGroups: Calculate group_ids for shuffled intermediate results + * 5. ReduceShuffledResults: Reduce shuffled intermediate results (which creates a reduced array vector) + * 6. Finalize: Finalize the reduced arrays + * + * ex: take `mean` operation + * 1. Calculate group_ids for value_col + * 2. Locally calculate sum and count for each group_id (intermediate_arrays = {sums, cnts}) + * 3. Shuffle intermediate_array + * 4. Calculate group_ids for shuffled intermediate results + * 5. Reduce shuffled sums and counts individually (reduced_arrays = {reduced_sums, reduced_cnts}) + * 6. output = divide(reduced_sum/ reduced_cnts) + * + * In a serial execution mode, this will be simplified into following stages. + * 1. MapToGroups: Calculate group_ids for value_col + * 2. CombineLocally: Combine value_col locally based on group_ids (which creates an intermediate array vector) + * 3. Finalize: Finalize the intermediate arrays + */ +struct MapReduceKernel { + public: + virtual ~MapReduceKernel() = default; + + virtual void Init(arrow::MemoryPool *pool, compute::KernelOptions *options) = 0; + + /** + * Combine `value_col` array locally based on the group_id, and push intermediate results to + * `combined_results` array vector. + * @param value_col + * @param local_group_ids + * @param local_num_groups + * @param combined_results + * @return + */ + virtual Status CombineLocally(const std::shared_ptr &value_col, + const std::shared_ptr &local_group_ids, + int64_t local_num_groups, + arrow::ArrayVector *combined_results) const = 0; + + /** + * Reduce `combined_results` vector to its finalized array vector based on the new group_ids + * (after being shuffled). + * @param combined_results + * @param local_group_ids + * @param local_group_indices + * @param local_num_groups + * @param reduced_results + * @return + */ + virtual Status ReduceShuffledResults(const arrow::ArrayVector &combined_results, + const std::shared_ptr &local_group_ids, + const std::shared_ptr &local_group_indices, + int64_t local_num_groups, + arrow::ArrayVector *reduced_results) const = 0; + + /** + * Create the final output array + * @param combined_results + * @param output + * @return + */ + virtual Status Finalize(const arrow::ArrayVector &combined_results, + std::shared_ptr *output) const = 0; + + /** + * In distributed mode, some kernel implementations may choose to do a single stage reduction of + * an array. i.e. + * Shuffle --> MapToGroups --> CombineLocally --> Finalize + * Those can simply set this flag. Then the shuffled value column will be forwarded straight to + * the CombineLocally method. + */ + virtual bool single_stage_reduction() const { return false; }; + inline size_t num_arrays() const; + virtual std::string name() const = 0; + virtual const std::shared_ptr &output_type() const = 0; + virtual const arrow::DataTypeVector &intermediate_types() const = 0; +}; + +std::unique_ptr MakeMapReduceKernel(const std::shared_ptr &type, + compute::AggregationOpId reduce_op); + +/** + * Distributed hash groupby using mapreduce approach + */ +using AggOpVector = std::vector>>; +Status MapredHashGroupBy(const std::shared_ptr
&table, const std::vector &key_cols, + const AggOpVector &aggs, std::shared_ptr
*output, + const std::unique_ptr &mapper + = std::make_unique()); + +using AggOpIdVector = std::vector>; +Status MapredHashGroupBy(const std::shared_ptr
&table, const std::vector &key_cols, + const AggOpIdVector &aggs, std::shared_ptr
*output); + +} +} + +#endif //CYLON_CPP_SRC_CYLON_MAPREDUCE_MAPREDUCE_HPP_ diff --git a/cpp/src/cylon/table.cpp b/cpp/src/cylon/table.cpp index 2f0bb84f6..71cd70bb8 100644 --- a/cpp/src/cylon/table.cpp +++ b/cpp/src/cylon/table.cpp @@ -51,12 +51,15 @@ namespace cylon { * @return */ Status PrepareArray(std::shared_ptr &ctx, - const std::shared_ptr &table, const int32_t col_idx, - const std::vector &row_indices, arrow::ArrayVector &array_vector) { + const std::shared_ptr &table, const int32_t col_idx, + const std::vector &row_indices, arrow::ArrayVector &array_vector) { std::shared_ptr destination_col_array; arrow::Status ar_status = - cylon::util::copy_array_by_indices(row_indices, cylon::util::GetChunkOrEmptyArray(table->column(col_idx), 0), - &destination_col_array, cylon::ToArrowPool(ctx)); + cylon::util::copy_array_by_indices(row_indices, + cylon::util::GetChunkOrEmptyArray(table->column(col_idx), + 0), + &destination_col_array, + cylon::ToArrowPool(ctx)); if (ar_status != arrow::Status::OK()) { LOG(FATAL) << "Failed while copying a column to the final table from tables." << ar_status.ToString(); @@ -88,19 +91,20 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr arrow_callback, schema); // if world size == partitions, simply send paritions based on index - const size_t world_size = (size_t) ctx->GetWorldSize(), num_partitions = partitioned_tables.size(), + const int world_size = ctx->GetWorldSize(), + num_partitions = (int) partitioned_tables.size(), rank = ctx->GetRank(); if (world_size == num_partitions) { - for (size_t i = 0; i < partitioned_tables.size(); i++) { + for (int i = 0; i < num_partitions; i++) { if (i != rank) { all_to_all.insert(partitioned_tables[i], i); } else { received_tables.push_back(partitioned_tables[i]); } } - } else { // divide parititions to world_size potions and send accordingly - for (size_t i = 0; i < partitioned_tables.size(); i++) { - size_t target = i * world_size / num_partitions; + } else { // divide partitions to world_size potions and send accordingly + for (int i = 0; i < num_partitions; i++) { + int target = i * world_size / num_partitions; if (target != rank) { all_to_all.insert(partitioned_tables[i], target); } else { @@ -119,23 +123,16 @@ static inline Status all_to_all_arrow_tables(const std::shared_ptr partitioned_tables.clear();*/ // now we have the final set of tables - LOG(INFO) << "Concatenating tables, Num of tables : " << received_tables.size(); - arrow::Result> concat_res = - arrow::ConcatenateTables(received_tables); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); - const auto &final_table = concat_res.ValueOrDie(); - LOG(INFO) << "Done concatenating tables, rows : " << final_table->num_rows(); - - arrow::Result> combine_res = - final_table->CombineChunks(cylon::ToArrowPool(ctx)); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); - table_out = combine_res.ValueOrDie(); +// LOG(INFO) << "Concatenating tables, Num of tables : " << received_tables.size(); + CYLON_ASSIGN_OR_RAISE(auto concat, arrow::ConcatenateTables(received_tables)) +// LOG(INFO) << "Done concatenating tables, rows : " << concat->num_rows(); + CYLON_ASSIGN_OR_RAISE(table_out, concat->CombineChunks(cylon::ToArrowPool(ctx))) return Status::OK(); } /** - * output rows order by rank number + * output rows order by rank number */ static inline Status all_to_all_arrow_tables_preserve_order(const std::shared_ptr &ctx, const std::shared_ptr &schema, @@ -213,14 +210,12 @@ static inline Status shuffle_table_by_hashing(const std::shared_ptr outPartitions, counts; int no_of_partitions = ctx->GetWorldSize(); - RETURN_CYLON_STATUS_IF_FAILED(MapToHashPartitions(table, - hash_column, - no_of_partitions, - outPartitions, - counts)); + RETURN_CYLON_STATUS_IF_FAILED( + MapToHashPartitions(table, hash_column, no_of_partitions, outPartitions, counts)); std::vector> partitioned_tables; - RETURN_CYLON_STATUS_IF_FAILED(Split(table, no_of_partitions, outPartitions, counts, partitioned_tables)); + RETURN_CYLON_STATUS_IF_FAILED( + Split(table, no_of_partitions, outPartitions, counts, partitioned_tables)); std::shared_ptr schema = table->get_table()->schema(); // we are going to free if retain is set to false @@ -240,18 +235,22 @@ static inline Status shuffle_two_tables_by_hashing(const std::shared_ptr &left_table_out, std::shared_ptr &right_table_out) { - LOG(INFO) << "Shuffling two tables with total rows : " << left_table->Rows() + right_table->Rows(); + LOG(INFO) << "Shuffling two tables with total rows : " + << left_table->Rows() + right_table->Rows(); auto t1 = std::chrono::high_resolution_clock::now(); - RETURN_CYLON_STATUS_IF_FAILED(shuffle_table_by_hashing(ctx, left_table, left_hash_column, left_table_out)); + RETURN_CYLON_STATUS_IF_FAILED( + shuffle_table_by_hashing(ctx, left_table, left_hash_column, left_table_out)); auto t2 = std::chrono::high_resolution_clock::now(); LOG(INFO) << "Left shuffle time : " << std::chrono::duration_cast(t2 - t1).count(); - RETURN_CYLON_STATUS_IF_FAILED(shuffle_table_by_hashing(ctx, right_table, right_hash_column, right_table_out)); + RETURN_CYLON_STATUS_IF_FAILED( + shuffle_table_by_hashing(ctx, right_table, right_hash_column, right_table_out)); auto t3 = std::chrono::high_resolution_clock::now(); - LOG(INFO) << "Right shuffle time : " << std::chrono::duration_cast(t3 - t2).count(); + LOG(INFO) << "Right shuffle time : " + << std::chrono::duration_cast(t3 - t2).count(); return Status::OK(); } @@ -352,11 +351,12 @@ void Table::Print(int row1, int row2, int col1, int col2) const { PrintToOStream(col1, col2, row1, row2, std::cout); } -Status Merge(const std::vector> &ctables, std::shared_ptr
&tableOut) { +Status Merge(const std::vector> &ctables, + std::shared_ptr
&tableOut) { if (!ctables.empty()) { std::vector> tables; tables.reserve(ctables.size()); - for (const auto &t:ctables) { + for (const auto &t: ctables) { if (t->Rows()) { std::shared_ptr arrow; t->ToArrowTable(arrow); @@ -390,7 +390,8 @@ Status Sort(const std::shared_ptr
&table, int sort_column, RETURN_CYLON_STATUS_IF_ARROW_FAILED(util::Duplicate(table_, pool, sorted_table)); } - RETURN_CYLON_STATUS_IF_ARROW_FAILED(cylon::util::SortTable(table_, sort_column, pool, sorted_table, ascending)); + RETURN_CYLON_STATUS_IF_ARROW_FAILED( + util::SortTable(table_, sort_column, pool, sorted_table, ascending)); return Table::FromArrowTable(ctx, sorted_table, out); } @@ -430,10 +431,7 @@ Status DistributedSort(const std::shared_ptr
&table, std::shared_ptr
&output, bool ascending, SortOptions sort_options) { - return DistributedSort(table, - std::vector{sort_column}, - output, - std::vector{ascending}, + return DistributedSort(table, std::vector{sort_column}, output, std::vector{ascending}, sort_options); } @@ -639,7 +637,8 @@ Status Select(const std::shared_ptr
&table, const std::function &first, const std::shared_ptr
&second, std::shared_ptr
&out) { +Status Union(const std::shared_ptr
&first, const std::shared_ptr
&second, + std::shared_ptr
&out) { std::shared_ptr ltab = first->get_table(); std::shared_ptr rtab = second->get_table(); const auto &ctx = first->GetContext(); @@ -676,11 +675,10 @@ Status Union(const std::shared_ptr
&first, const std::shared_ptr
& RETURN_CYLON_STATUS_IF_ARROW_FAILED(mask_builder.Finish(&mask)); const auto &options = arrow::compute::FilterOptions::Defaults(); - const arrow::Result &l_res = arrow::compute::Filter(ltab, mask, options, &exec_context); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(l_res.status()); - + CYLON_ASSIGN_OR_RAISE( + auto l_res, arrow::compute::Filter(ltab, mask, options, &exec_context)); // filtered first table - const std::shared_ptr &f_ltab = l_res.ValueOrDie().table(); + const std::shared_ptr &f_ltab = l_res.table(); // insert second table to the row set mask_builder.Reset(); @@ -695,27 +693,24 @@ Status Union(const std::shared_ptr
&first, const std::shared_ptr
& } RETURN_CYLON_STATUS_IF_ARROW_FAILED(mask_builder.Finish(&mask)); - const arrow::Result &r_res = arrow::compute::Filter(rtab, mask, options, &exec_context); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(r_res.status()); - + CYLON_ASSIGN_OR_RAISE( + auto r_res, arrow::compute::Filter(rtab, mask, options, &exec_context)) // filtered second table - const std::shared_ptr &f_rtab = r_res.ValueOrDie().table(); + const std::shared_ptr &f_rtab = r_res.table(); // concat filtered tables - const auto - &concat_res = arrow::ConcatenateTables({f_ltab, f_rtab}, arrow::ConcatenateTablesOptions::Defaults(), pool); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(concat_res.status()); - + CYLON_ASSIGN_OR_RAISE(auto concat, + arrow::ConcatenateTables({f_ltab, f_rtab}, + arrow::ConcatenateTablesOptions::Defaults(), + pool)) // combine chunks - auto merge_res = concat_res.ValueOrDie()->CombineChunks(); - RETURN_CYLON_STATUS_IF_ARROW_FAILED(merge_res.status()); + CYLON_ASSIGN_OR_RAISE(auto merge, concat->CombineChunks()) - out = std::make_shared(ctx, merge_res.ValueOrDie()); - return Status::OK(); + return Table::FromArrowTable(ctx, std::move(merge), out); } Status Subtract(const std::shared_ptr
&first, const std::shared_ptr
&second, - std::shared_ptr
&out) { + std::shared_ptr
&out) { std::shared_ptr ltab = first->get_table(); std::shared_ptr rtab = second->get_table(); const auto &ctx = first->GetContext(); @@ -769,8 +764,8 @@ Status Subtract(const std::shared_ptr
&first, const std::shared_ptr
&first, - const std::shared_ptr
&second, - std::shared_ptr
&out) { + const std::shared_ptr
&second, + std::shared_ptr
&out) { std::shared_ptr ltab = first->get_table(); std::shared_ptr rtab = second->get_table(); @@ -858,8 +853,9 @@ static inline Status do_dist_set_op(LocalSetOperation local_operation, std::shared_ptr left_final_table; std::shared_ptr right_final_table; - RETURN_CYLON_STATUS_IF_FAILED(shuffle_two_tables_by_hashing(ctx, table_left, hash_columns, table_right, hash_columns, - left_final_table, right_final_table)); + RETURN_CYLON_STATUS_IF_FAILED( + shuffle_two_tables_by_hashing(ctx, table_left, hash_columns, table_right, hash_columns, + left_final_table, right_final_table)); std::shared_ptr left_tab = std::make_shared(ctx, left_final_table); std::shared_ptr right_tab = std::make_shared(ctx, right_final_table); @@ -884,9 +880,9 @@ Status DistributedIntersect(const std::shared_ptr
&left, const std::share } void ReadCSVThread(const std::shared_ptr &ctx, const std::string &path, - std::shared_ptr *table, - const cylon::io::config::CSVReadOptions &options, - const std::shared_ptr> &status_promise) { + std::shared_ptr *table, + const cylon::io::config::CSVReadOptions &options, + const std::shared_ptr> &status_promise) { // const std::shared_ptr &ctx_ = ctx; // make a copy of the shared ptr status_promise->set_value(FromCSV(ctx, path, *table, options)); } @@ -910,7 +906,7 @@ Status FromCSV(const std::shared_ptr &ctx, const std::vector &table, const std::vector &hash const auto &ctx_ = table->GetContext(); std::shared_ptr table_out; RETURN_CYLON_STATUS_IF_FAILED(shuffle_table_by_hashing(ctx_, table, hash_columns, table_out)); - return cylon::Table::FromArrowTable(ctx_, table_out, output); + return cylon::Table::FromArrowTable(ctx_, std::move(table_out), output); } Status Unique(const std::shared_ptr
&in, const std::vector &cols, @@ -1105,8 +1101,9 @@ Status DistributedUnique(const std::shared_ptr
&in, const std::vector& a, const std::shared_ptr& b, bool& result, bool ordered) { - if(ordered) { +Status Equals(const std::shared_ptr &a, const std::shared_ptr &b, + bool &result, bool ordered) { + if (ordered) { result = a->get_table()->Equals(*b->get_table()); } else { result = false; @@ -1153,7 +1150,7 @@ Status DistributedEquals(const std::shared_ptr &a, const std::shar std::vector indices(col); std::vector column_orders(col, true); std::iota(indices.begin(), indices.end(), 0); - + std::shared_ptr b_repartitioned; RETURN_CYLON_STATUS_IF_FAILED(RepartitionToMatchOtherTable(a, b, &b_repartitioned)); @@ -1172,7 +1169,7 @@ Status DistributedEquals(const std::shared_ptr &a, const std::shar return Status::OK(); } -Status Repartition(const std::shared_ptr& table, +Status Repartition(const std::shared_ptr& table, const std::vector& rows_per_partition, const std::vector& receive_build_rank_order, std::shared_ptr
*output) { @@ -1309,7 +1306,8 @@ const std::shared_ptr &Table::GetContext() const { Table::Table(const std::shared_ptr &ctx, std::shared_ptr tab) : ctx(ctx), table_(std::move(tab)), - base_arrow_index_(std::make_shared(0, table_->num_rows(), 1, cylon::ToArrowPool(ctx))) {} + base_arrow_index_(std::make_shared(0, table_->num_rows(), 1, + cylon::ToArrowPool(ctx))) {} #ifdef BUILD_CYLON_PARQUET Status FromParquet(const std::shared_ptr &ctx, const std::string &path, @@ -1349,10 +1347,11 @@ Status FromParquet(const std::shared_ptr &ctx, const std::vector>(); futures.emplace_back( read_promise->get_future(), - std::thread(ReadParquetThread, std::cref(ctx), std::cref(paths[kI]), tableOuts[kI], read_promise)); + std::thread(ReadParquetThread, std::cref(ctx), std::cref(paths[kI]), tableOuts[kI], + read_promise)); } bool all_passed = true; - for (auto &future : futures) { + for (auto &future: futures) { auto status = future.first.get(); all_passed &= status.is_ok(); future.second.join(); diff --git a/cpp/src/cylon/util/arrow_utils.cpp b/cpp/src/cylon/util/arrow_utils.cpp index 8518f9aa1..79ede60d4 100644 --- a/cpp/src/cylon/util/arrow_utils.cpp +++ b/cpp/src/cylon/util/arrow_utils.cpp @@ -347,6 +347,21 @@ std::array GetBytesAndElements(std::shared_ptr table, } return {num_elements, num_bytes}; } +arrow::Status CreateEmptyTable(const std::shared_ptr &schema, + std::shared_ptr *output, + arrow::MemoryPool *pool) { + std::vector> arrays; + arrays.reserve(schema->num_fields()); + + for (int i = 0; i < schema->num_fields(); i++){ + const auto& t = schema->field(i)->type(); + ARROW_ASSIGN_OR_RAISE(auto arr, arrow::MakeArrayOfNull(t, 0, pool)) + arrays.emplace_back(std::make_shared(std::move(arr))); + } + + *output = arrow::Table::Make(schema, std::move(arrays), 0); + return arrow::Status::OK(); +} arrow::Status MakeEmptyArrowTable(const std::shared_ptr &schema, std::shared_ptr *table, diff --git a/cpp/src/cylon/util/arrow_utils.hpp b/cpp/src/cylon/util/arrow_utils.hpp index 922a683ec..777ab0b6a 100644 --- a/cpp/src/cylon/util/arrow_utils.hpp +++ b/cpp/src/cylon/util/arrow_utils.hpp @@ -143,7 +143,12 @@ uint64_t GetNumberSplitsToFitInCache(int64_t total_bytes, int total_elements, in * @param columns * @return */ -std::array GetBytesAndElements(std::shared_ptr table, const std::vector &columns); +std::array GetBytesAndElements(std::shared_ptr table, + const std::vector &columns); + +arrow::Status CreateEmptyTable(const std::shared_ptr &schema, + std::shared_ptr *output, + arrow::MemoryPool *pool = arrow::default_memory_pool()); arrow::Status MakeEmptyArrowTable(const std::shared_ptr& schema, std::shared_ptr* table, arrow::MemoryPool *pool = arrow::default_memory_pool()); diff --git a/cpp/src/examples/CMakeLists.txt b/cpp/src/examples/CMakeLists.txt index cacbc4eae..cefd6a667 100644 --- a/cpp/src/examples/CMakeLists.txt +++ b/cpp/src/examples/CMakeLists.txt @@ -54,6 +54,7 @@ cylon_add_exe(table_from_vectors_example) cylon_add_exe(compute_example) cylon_add_exe(groupby_pipeline_example) cylon_add_exe(groupby_example) +cylon_add_exe(groupby_perf) cylon_add_exe(unique_example) cylon_add_exe(indexing_example) cylon_add_exe(sorting_example) diff --git a/cpp/src/examples/groupby_example.cpp b/cpp/src/examples/groupby_example.cpp index 84098cef5..293d2e213 100644 --- a/cpp/src/examples/groupby_example.cpp +++ b/cpp/src/examples/groupby_example.cpp @@ -65,24 +65,31 @@ int main(int argc, char *argv[]) { output) CHECK_STATUS_AND_PRINT(first_table, - cylon::HashGroupBy(first_table, {0, 1}, {{2, cylon::compute::VarOp::Make()}}, output), + cylon::HashGroupBy(first_table, + {0, 1}, + {{2, std::make_shared()}}, + output), output) CHECK_STATUS_AND_PRINT(first_table, - cylon::DistributedHashGroupBy(first_table, {0, 1}, {2}, {cylon::compute::VAR}, output), + cylon::DistributedHashGroupBy(first_table, {0, 1}, {2}, + {cylon::compute::VAR}, output), output) CHECK_STATUS_AND_PRINT(first_table, - cylon::HashGroupBy(first_table, {0, 1}, {{2, cylon::compute::NUNIQUE}}, output), + cylon::HashGroupBy(first_table, {0, 1}, + {{2, cylon::compute::NUNIQUE}}, output), output) - CHECK_STATUS_AND_PRINT(first_table, - cylon::HashGroupBy(first_table, {0, 1}, {{2, cylon::compute::QuantileOp::Make(0.2)}}, output), - output) + CHECK_STATUS_AND_PRINT( + first_table, + cylon::HashGroupBy(first_table, {0, 1}, + {{2, std::make_shared(0.2)}}, output), output) - CHECK_STATUS_AND_PRINT(first_table, - cylon::DistributedHashGroupBy(first_table, {0, 1}, {2}, {cylon::compute::COUNT}, output), - output) + CHECK_STATUS_AND_PRINT( + first_table, + cylon::DistributedHashGroupBy(first_table, {0, 1}, {2}, {cylon::compute::COUNT}, output), + output) ctx->Finalize(); return 0; diff --git a/cpp/src/examples/groupby_perf.cpp b/cpp/src/examples/groupby_perf.cpp new file mode 100644 index 000000000..d33657c7e --- /dev/null +++ b/cpp/src/examples/groupby_perf.cpp @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#include "example_utils.hpp" + +#define CYLON_LOG_HELP() \ + do{ \ + LOG(ERROR) << "input arg error " << std::endl \ + << "./groupby_perf m num_tuples_per_worker dup_factor[0.0-1.0]" << std::endl \ + << "./groupby_perf f csv_file1" << std::endl; \ + return 1; \ + } while(0) + +int main(int argc, char *argv[]) { + if (argc != 3 && argc != 4) { + CYLON_LOG_HELP(); + } + + auto start_start = std::chrono::steady_clock::now(); + auto mpi_config = std::make_shared(); + auto ctx = cylon::CylonContext::InitDistributed(mpi_config); + + std::shared_ptr table, output; + auto read_options = cylon::io::config::CSVReadOptions().UseThreads(false).BlockSize(1 << 30); + + std::string mem = std::string(argv[1]); + if (mem == "m" && argc == 4) { + LOG(INFO) << "using in-mem tables"; + int64_t count = std::stoll(argv[2]); + double dup = std::stod(argv[3]); + if (cylon::examples::create_in_memory_tables(count, dup, ctx, table, 0)) { + LOG(ERROR) << "table creation failed!"; + return 1; + } + } else if (mem == "f" && argc == 3) { + LOG(INFO) << "using files"; + if (!cylon::FromCSV(ctx, std::string(argv[2]) + std::to_string(ctx->GetRank()) + ".csv", table) + .is_ok()) { + LOG(ERROR) << "file reading failed!"; + return 1; + } + } else { + CYLON_LOG_HELP(); + } + + ctx->Barrier(); + auto read_end_time = std::chrono::steady_clock::now(); + LOG(INFO) << "Input tables created in " + << std::chrono::duration_cast( + read_end_time - start_start).count() << "[ms]"; + + auto status = cylon::DistributedHashGroupBy(table, 0, {1, 1, 1}, + {cylon::compute::SUM, cylon::compute::MEAN, + cylon::compute::STDDEV}, + output); + auto end_time = std::chrono::steady_clock::now(); + + if (!status.is_ok()) { + LOG(INFO) << "Unique failed " << status.get_msg(); + ctx->Finalize(); + return 1; + } + + LOG(INFO) << "Table had : " << table->Rows() << ", output has : " << output->Rows(); + LOG(INFO) << "Completed in " + << std::chrono::duration_cast(end_time - read_end_time) + .count() << "[ms]"; + + read_end_time = std::chrono::steady_clock::now(); + LOG(INFO) << "Input tables created in " + << std::chrono::duration_cast( + read_end_time - start_start).count() << "[ms]"; + + cylon::mapred::AggOpVector ops{{1, cylon::compute::SumOp::Make()}, + {1, cylon::compute::MeanOp::Make()}, + {1, cylon::compute::StdDevOp::Make()}}; + status = cylon::mapred::MapredHashGroupBy(table, {0}, ops, &output); + end_time = std::chrono::steady_clock::now(); + + if (!status.is_ok()) { + LOG(INFO) << "Unique failed " << status.get_msg(); + ctx->Finalize(); + return 1; + } + + LOG(INFO) << "Table had : " << table->Rows() << ", output has : " << output->Rows(); + LOG(INFO) << "Completed in " + << std::chrono::duration_cast(end_time - read_end_time) + .count() << "[ms]"; + + ctx->Finalize(); + return 0; +} diff --git a/cpp/test/aggregate_test.cpp b/cpp/test/aggregate_test.cpp index 259d7fe4b..153bc5996 100644 --- a/cpp/test/aggregate_test.cpp +++ b/cpp/test/aggregate_test.cpp @@ -13,10 +13,14 @@ */ #include +#include +#include + #include "common/test_header.hpp" #include "test_utils.hpp" -using namespace cylon; +namespace cylon { +namespace test { TEST_CASE("aggregate testing", "[aggregates]") { const int rows = 12; @@ -37,7 +41,8 @@ TEST_CASE("aggregate testing", "[aggregates]") { auto res_scalar = std::static_pointer_cast(result->GetResult().scalar()); std::cout << " " << res_scalar->value << std::endl; - REQUIRE(res_scalar->value == ((double) (rows * (rows - 1) / 2.0) + 10.0 * rows) * ctx->GetWorldSize()); + REQUIRE(res_scalar->value + == ((double) (rows * (rows - 1) / 2.0) + 10.0 * rows) * ctx->GetWorldSize()); } SECTION("testing count") { @@ -64,7 +69,7 @@ TEST_CASE("aggregate testing", "[aggregates]") { REQUIRE(res_scalar->value == 10.0 + (double) (rows - 1)); } - // Adding Table output based Aggregates + // Adding Table output based Aggregates SECTION("testing table:sum") { status = cylon::compute::Sum(table, 1, output); @@ -105,7 +110,130 @@ TEST_CASE("aggregate testing", "[aggregates]") { REQUIRE(val == 10.0 + (double) (rows - 1)); } +} +TEMPLATE_LIST_TEST_CASE("mapred kernels", "[mapred]", ArrowNumericTypes) { + auto type = default_type_instance(); + auto pool = ToArrowPool(ctx); + + SECTION("sum") { + INFO("sum " + type->ToString()) + auto kern = mapred::MakeMapReduceKernel(type, compute::SUM); + + // init + kern->Init(pool, /*options=*/nullptr); + + // combine + auto arr = ArrayFromJSON(type, "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto g_ids = ArrayFromJSON(arrow::int64(), "[0, 0, 1, 1, 2, 2, 3, 3, 4, 4]"); + arrow::ArrayVector array_vector; + CHECK_CYLON_STATUS(kern->CombineLocally(arr, g_ids, 5, &array_vector)); + REQUIRE(array_vector.size() == 1); + REQUIRE(array_vector[0]->length() == 5); + CHECK_ARROW_EQUAL(ArrayFromJSON(type, "[1, 5, 9, 13, 17]"), array_vector[0]); + + // reduce + CHECK_CYLON_STATUS(kern->ReduceShuffledResults({arr}, g_ids, nullptr, 5, &array_vector)); + CHECK_ARROW_EQUAL(ArrayFromJSON(type, "[1, 5, 9, 13, 17]"), array_vector[0]); + + //finalize + std::shared_ptr out; + CHECK_CYLON_STATUS(kern->Finalize(array_vector, &out)); + CHECK_ARROW_EQUAL(array_vector[0], out); + } + + SECTION("count") { + INFO("count " + type->ToString()) + auto kern = mapred::MakeMapReduceKernel(type, compute::COUNT); + + // init + kern->Init(pool, /*options=*/nullptr); + + // combine + auto arr = ArrayFromJSON(type, "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto g_ids = ArrayFromJSON(arrow::int64(), "[0, 0, 1, 1, 2, 2, 3, 3, 4, 4]"); + arrow::ArrayVector array_vector; + CHECK_CYLON_STATUS(kern->CombineLocally(arr, g_ids, 5, &array_vector)); + REQUIRE(array_vector.size() == 1); + REQUIRE(array_vector[0]->length() == 5); + CHECK_ARROW_EQUAL(ArrayFromJSON(arrow::int64(), "[2, 2, 2, 2, 2]"), array_vector[0]); + + // reduce + arr = ArrayFromJSON(arrow::int64(), "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); + CHECK_CYLON_STATUS(kern->ReduceShuffledResults({arr}, g_ids, nullptr, 5, &array_vector)); + CHECK_ARROW_EQUAL(ArrayFromJSON(arrow::int64(), "[1, 5, 9, 13, 17]"), array_vector[0]); + + //finalize + std::shared_ptr out; + CHECK_CYLON_STATUS(kern->Finalize(array_vector, &out)); + CHECK_ARROW_EQUAL(array_vector[0], out); + } + SECTION("mean") { + INFO("mean " + type->ToString()) + auto kern = mapred::MakeMapReduceKernel(type, compute::MEAN); + + // init + kern->Init(pool, /*options=*/nullptr); + + // combine + auto arr = ArrayFromJSON(type, "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]"); + auto g_ids = ArrayFromJSON(arrow::int64(), "[0, 0, 1, 1, 2, 2, 3, 3, 4, 4]"); + arrow::ArrayVector array_vector; + CHECK_CYLON_STATUS(kern->CombineLocally(arr, g_ids, 5, &array_vector)); + REQUIRE(array_vector.size() == 2); + REQUIRE(array_vector[0]->length() == 5); + CHECK_ARROW_EQUAL(ArrayFromJSON(type, "[1, 5, 9, 13, 17]"), array_vector[0]); + CHECK_ARROW_EQUAL(ArrayFromJSON(arrow::int64(), "[2, 2, 2, 2, 2]"), array_vector[1]); + + // reduce + auto cnts = ArrayFromJSON(arrow::int64(), "[2, 3, 2, 3, 2, 3, 2, 3, 2, 3]"); + CHECK_CYLON_STATUS( + kern->ReduceShuffledResults({arr, cnts}, g_ids, nullptr, 5, &array_vector)); + CHECK_ARROW_EQUAL(ArrayFromJSON(type, "[1, 5, 9, 13, 17]"), array_vector[0]); + CHECK_ARROW_EQUAL(ArrayFromJSON(arrow::int64(), "[5, 5, 5, 5, 5]"), array_vector[1]); + + //finalize + std::shared_ptr out; + CHECK_CYLON_STATUS(kern->Finalize(array_vector, &out)); + auto exp = ArrayFromJSON(arrow::float64(), "[0.2, 1, 1.8, 2.6, 3.4]"); + auto exp_cast = *arrow::compute::Cast(*exp, type, arrow::compute::CastOptions::Unsafe()); + CHECK_ARROW_EQUAL(exp_cast, out); + } +} + +TEMPLATE_LIST_TEST_CASE("mapred local aggregate", "[mapred]", ArrowNumericTypes) { + // if distributed, return + if (ctx->GetWorldSize() > 1) { return; } + + auto type = default_type_instance(); + + auto key = ArrayFromJSON(type, "[0, 1, 2, 2, 3, 3, 3, 4, 4, 4, 4]"); + auto val = ArrayFromJSON(type, "[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]"); + + auto schema = arrow::schema({arrow::field("a", type), arrow::field("b", type)}); + std::shared_ptr
table, output; + CHECK_CYLON_STATUS(Table::FromArrowTable(ctx, arrow::Table::Make(schema, {key, val}), table)); + + mapred::AggOpVector + ops{{1, compute::SumOp::Make()}, {1, compute::CountOp::Make()}, {1, compute::MeanOp::Make()}}; + CHECK_CYLON_STATUS(mapred::MapredHashGroupBy(table, {0}, ops, &output)); + + auto exp_key = ArrayFromJSON(type, "[0, 1, 2, 3, 4]"); + auto exp_sum = ArrayFromJSON(type, "[0, 1, 5, 15, 34]"); + auto exp_cnt = ArrayFromJSON(arrow::int64(), "[1, 1, 2, 3, 4]"); + auto avg = ArrayFromJSON(arrow::float64(), "[0, 1, 2.5, 5, 8.5]"); + auto exp_avg = *arrow::compute::Cast(*avg, type, arrow::compute::CastOptions::Unsafe()); + auto exp_tab = arrow::Table::Make(arrow::schema({ + arrow::field("a", type), + arrow::field("b_sum", type), + arrow::field("b_count", arrow::int64()), + arrow::field("b_mean", type) + }), + {exp_key, exp_sum, exp_cnt, exp_avg}); + + CHECK_ARROW_EQUAL(exp_tab, output->get_table()); +} } +} \ No newline at end of file diff --git a/cpp/test/groupby_test.cpp b/cpp/test/groupby_test.cpp index d9121376b..91fb85254 100644 --- a/cpp/test/groupby_test.cpp +++ b/cpp/test/groupby_test.cpp @@ -22,6 +22,8 @@ #include "common/test_header.hpp" +#include "cylon/mapreduce/mapreduce.hpp" + namespace cylon { namespace test { @@ -40,15 +42,27 @@ Status create_table(const std::shared_ptr &ctx_, Status HashCylonGroupBy(std::shared_ptr
&ctable, const compute::AggregationOpId &aggregate_ops, std::shared_ptr
&output) { - + INFO("generic groupby"); CHECK_CYLON_STATUS(DistributedHashGroupBy(ctable, 0, {1}, {aggregate_ops}, output)); INFO("hash_group op:" << aggregate_ops << " rows:" << output->Rows()); return Status::OK(); } +Status HashCylonGroupByMapred(std::shared_ptr
&ctable, + const compute::AggregationOpId &aggregate_ops, + std::shared_ptr
&output) { + INFO("mapred groupby"); + CHECK_CYLON_STATUS( + mapred::MapredHashGroupBy(ctable, {0}, + {{1, compute::MakeAggregationOpFromID(aggregate_ops)}}, + &output)); + INFO("hash_group op:" << aggregate_ops << " rows:" << output->Rows()); + return Status::OK(); +} + + Status PipelineCylonGroupBy(std::shared_ptr
&ctable, std::shared_ptr
&output) { - CHECK_CYLON_STATUS(DistributedPipelineGroupBy(ctable, 0, {1}, {compute::SUM}, output)); INFO("pipe_group " << output->Rows()); return Status::OK(); @@ -67,46 +81,52 @@ TEMPLATE_LIST_TEST_CASE("groupby testing", "[groupby]", ArrowNumericTypes) { std::shared_ptr result; - SECTION("testing hash group by result") { - CHECK_CYLON_STATUS(HashCylonGroupBy(table, compute::SUM, output1)); + SECTION("testing hash group by sum") { + for (auto FnPtr: {&HashCylonGroupBy, &HashCylonGroupByMapred}) { + CHECK_CYLON_STATUS(FnPtr(table, compute::SUM, output1)); - CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); - auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("idx_sum " << idx_sum->value); - REQUIRE(idx_sum->value == 6); // 3* 4/ 2 + CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); + auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("idx_sum " << idx_sum->value); + REQUIRE(idx_sum->value == 6); // 3* 4/ 2 - CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); - auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("val_sum " << val_sum->ToString()); - REQUIRE(val_sum->value == (T) (2 * 6 * ctx->GetWorldSize())); + CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); + auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("val_sum " << val_sum->ToString()); + REQUIRE(val_sum->value == (T) (2 * 6 * ctx->GetWorldSize())); + } } SECTION("testing hash group by count") { - CHECK_CYLON_STATUS(HashCylonGroupBy(table, compute::COUNT, output1)); - - CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); - auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("idx_sum " << idx_sum->value); - REQUIRE(idx_sum->value == 6); // 3* 4/ 2 - - CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); - auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("val_sum " << val_sum->value); - REQUIRE(val_sum->value == 4 * 2 * ctx->GetWorldSize()); + for (auto FnPtr: {&HashCylonGroupBy, &HashCylonGroupByMapred}) { + CHECK_CYLON_STATUS(FnPtr(table, compute::COUNT, output1)); + + CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); + auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("idx_sum " << idx_sum->value); + REQUIRE(idx_sum->value == 6); // 3* 4/ 2 + + CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); + auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("val_sum " << val_sum->value); + REQUIRE(val_sum->value == 4 * 2 * ctx->GetWorldSize()); + } } SECTION("testing hash group by mean") { - CHECK_CYLON_STATUS(HashCylonGroupBy(table, compute::MEAN, output1)); - - CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); - auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("idx_sum " << idx_sum->value); - REQUIRE(idx_sum->value == 6); // 3* 4/ 2 - - CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); - auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); - INFO("val_sum " << val_sum->value); - REQUIRE(val_sum->value == 6.0); + for (auto FnPtr: {&HashCylonGroupBy, &HashCylonGroupByMapred}) { + CHECK_CYLON_STATUS(FnPtr(table, compute::MEAN, output1)); + + CHECK_CYLON_STATUS(compute::Sum(output1, 0, result)); + auto idx_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("idx_sum " << idx_sum->value); + REQUIRE(idx_sum->value == 6); // 3* 4/ 2 + + CHECK_CYLON_STATUS(compute::Sum(output1, 1, result)); + auto val_sum = std::static_pointer_cast(result->GetResult().scalar()); + INFO("val_sum " << val_sum->value); + REQUIRE(val_sum->value == 6.0); + } } SECTION("testing hash group by var") { diff --git a/cpp/test/test_macros.hpp b/cpp/test/test_macros.hpp index b29764735..587a1977d 100644 --- a/cpp/test/test_macros.hpp +++ b/cpp/test/test_macros.hpp @@ -20,18 +20,18 @@ #define CHECK_ARROW_EQUAL(expected, received) \ do { \ - const auto& exp = (expected); \ - const auto& rec = (received); \ - INFO("Expected: " << exp->ToString() << "\nReceived: " << rec->ToString());\ - REQUIRE(exp->Equals(*rec)); \ + const auto& exp_ = (expected); \ + const auto& rec_ = (received); \ + INFO("Expected: " << exp_->ToString() << "\nReceived: " << rec_->ToString());\ + REQUIRE(exp_->Equals(*rec_)); \ } while(0) #define CHECK_ARROW_BUFFER_EQUAL(expected, received) \ do { \ - const auto& exp = (expected); \ - const auto& rec = (received); \ - INFO("Expected: " << exp->ToHexString() << "\nReceived: " << rec->ToHexString());\ - REQUIRE(exp->Equals(*rec)); \ + const auto& exp_ = (expected); \ + const auto& rec_ = (received); \ + INFO("Expected: " << exp_->ToHexString() << "\nReceived: " << rec_->ToHexString());\ + REQUIRE(exp_->Equals(*rec_)); \ } while(0) #define CHECK_CYLON_STATUS(expr) \ diff --git a/docs/static/pydocs/frame.html b/docs/static/pydocs/frame.html index c64839aeb..1466b9fce 100644 --- a/docs/static/pydocs/frame.html +++ b/docs/static/pydocs/frame.html @@ -1704,7 +1704,7 @@

Module frame

Examples -------- - Combine two ``DataFrame`` objects with identical columns. + CombineBeforeShuffle two ``DataFrame`` objects with identical columns. >>> df1 = DataFrame([['a', 1], ['b', 2]], ... columns=['letter', 'number']) @@ -1725,7 +1725,7 @@

Module frame

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return everything. Columns outside the intersection will be filled with ``NaN`` values. @@ -1742,7 +1742,7 @@

Module frame

0 c 3 cat 1 d 4 dog - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return only those that are shared by passing ``inner`` to the ``join`` keyword argument. @@ -1753,7 +1753,7 @@

Module frame

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects horizontally along the x axis by + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects horizontally along the x axis by passing in ``axis=1``. >>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']], @@ -3782,7 +3782,7 @@

Returns

Examples -------- - Combine two ``DataFrame`` objects with identical columns. + CombineBeforeShuffle two ``DataFrame`` objects with identical columns. >>> df1 = DataFrame([['a', 1], ['b', 2]], ... columns=['letter', 'number']) @@ -3803,7 +3803,7 @@

Returns

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return everything. Columns outside the intersection will be filled with ``NaN`` values. @@ -3820,7 +3820,7 @@

Returns

0 c 3 cat 1 d 4 dog - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return only those that are shared by passing ``inner`` to the ``join`` keyword argument. @@ -3831,7 +3831,7 @@

Returns

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects horizontally along the x axis by + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects horizontally along the x axis by passing in ``axis=1``. >>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']], @@ -4181,7 +4181,7 @@

Parameters

the columns (axis=1) or rows (axis=0), a DataFrame is returned.

Examples

-

Combine two DataFrame objects with identical columns.

+

CombineBeforeShuffle two DataFrame objects with identical columns.

>>> df1 = DataFrame([['a', 1], ['b', 2]],
 ...                    columns=['letter', 'number'])
 >>> df1
@@ -4201,7 +4201,7 @@ 

Examples

0 c 3 1 d 4
-

(Unsupported) Combine DataFrame objects with overlapping columns +

(Unsupported) CombineBeforeShuffle DataFrame objects with overlapping columns and return everything. Columns outside the intersection will be filled with NaN values.

>>> df3 = DataFrame([['c', 3, 'cat'], ['d', 4, 'dog']],
@@ -4217,7 +4217,7 @@ 

Examples

0 c 3 cat 1 d 4 dog
-

(Unsupported) Combine DataFrame objects with overlapping columns +

(Unsupported) CombineBeforeShuffle DataFrame objects with overlapping columns and return only those that are shared by passing inner to the join keyword argument.

>>> DataFrame.concat([df1, df3], join="inner")
@@ -4227,7 +4227,7 @@ 

Examples

0 c 3 1 d 4
-

(Unsupported) Combine DataFrame objects horizontally along the x axis by +

(Unsupported) CombineBeforeShuffle DataFrame objects horizontally along the x axis by passing in axis=1.

>>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']],
 ...                    columns=['animal', 'name'])
@@ -4335,7 +4335,7 @@ 

Examples

Examples -------- - Combine two ``DataFrame`` objects with identical columns. + CombineBeforeShuffle two ``DataFrame`` objects with identical columns. >>> df1 = DataFrame([['a', 1], ['b', 2]], ... columns=['letter', 'number']) @@ -4356,7 +4356,7 @@

Examples

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return everything. Columns outside the intersection will be filled with ``NaN`` values. @@ -4373,7 +4373,7 @@

Examples

0 c 3 cat 1 d 4 dog - (Unsupported) Combine ``DataFrame`` objects with overlapping columns + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects with overlapping columns and return only those that are shared by passing ``inner`` to the ``join`` keyword argument. @@ -4384,7 +4384,7 @@

Examples

0 c 3 1 d 4 - (Unsupported) Combine ``DataFrame`` objects horizontally along the x axis by + (Unsupported) CombineBeforeShuffle ``DataFrame`` objects horizontally along the x axis by passing in ``axis=1``. >>> df4 = DataFrame([['bird', 'polly'], ['monkey', 'george']], diff --git a/python/pycylon/pycylon/data/groupby.pxd b/python/pycylon/pycylon/data/groupby.pxd index 3456c5e41..c17c9323d 100644 --- a/python/pycylon/pycylon/data/groupby.pxd +++ b/python/pycylon/pycylon/data/groupby.pxd @@ -17,6 +17,7 @@ from pycylon.common.status cimport CStatus from pycylon.data.table cimport CTable from libcpp.memory cimport shared_ptr from libcpp.vector cimport vector +from libcpp.pair cimport pair from pycylon.data.aggregates cimport CGroupByAggregationOp @@ -33,3 +34,9 @@ cdef extern from "../../../../cpp/src/cylon/groupby/groupby.hpp" namespace "cylo & aggregate_cols, const vector[CGroupByAggregationOp] & aggregate_ops, shared_ptr[CTable] & output) + +cdef extern from "../../../../cpp/src/cylon/mapreduce/mapreduce.hpp" namespace "cylon::mapred": + CStatus MapredHashGroupBy(shared_ptr[CTable] & table, + const vector[int] key_cols, + const vector[pair[int, CGroupByAggregationOp]] & aggs, + shared_ptr[CTable] * output) diff --git a/python/pycylon/pycylon/data/table.pyx b/python/pycylon/pycylon/data/table.pyx index 0af0cb1d0..742557d46 100644 --- a/python/pycylon/pycylon/data/table.pyx +++ b/python/pycylon/pycylon/data/table.pyx @@ -14,6 +14,8 @@ from libcpp.string cimport string from libc.stdint cimport int64_t +from libcpp.vector cimport vector +from libcpp.pair cimport pair from pycylon.common.status cimport CStatus from pycylon.common.status import Status from pycylon.common.join_config cimport CJoinType @@ -47,7 +49,7 @@ pycylon_unwrap_join_config) from pycylon.data.aggregates cimport (Sum, Count, Min, Max) from pycylon.data.aggregates cimport CGroupByAggregationOp from pycylon.data.aggregates import AggregationOp, AggregationOpString -from pycylon.data.groupby cimport (DistributedHashGroupBy, DistributedPipelineGroupBy) +from pycylon.data.groupby cimport (DistributedHashGroupBy, DistributedPipelineGroupBy, MapredHashGroupBy) from pycylon.data import compute from pycylon.index import RangeIndex, NumericIndex, range_calculator, process_index_by_value @@ -578,12 +580,16 @@ cdef class Table: def max(self, column): return self._agg_op(column, AggregationOp.MAX) - def groupby(self, index, agg: dict): + def groupby(self, index, agg: dict, groupby_type: str = 'hash'): + """ + :param groupby_type: 'hash' or 'mapred_hash' + """ cdef CStatus status cdef shared_ptr[CTable] output cdef vector[int] cindex_cols cdef vector[int] caggregate_cols cdef vector[CGroupByAggregationOp] caggregate_ops + cdef vector[pair[int, CGroupByAggregationOp]] caggregate_ops_pair if not agg or not isinstance(agg, dict): raise ValueError("agg should be non-empty and dict type") @@ -601,16 +607,22 @@ cdef class Table: if isinstance(agg_pair[1], str): caggregate_cols.push_back(col_idx) caggregate_ops.push_back(AggregationOpString[agg_pair[1]]) + caggregate_ops_pair.push_back(pair[int, CGroupByAggregationOp](col_idx, + AggregationOpString[agg_pair[1]])) elif isinstance(agg_pair[1], AggregationOp): caggregate_cols.push_back(col_idx) caggregate_ops.push_back(agg_pair[1]) + caggregate_ops_pair.push_back(pair[int, CGroupByAggregationOp](col_idx, agg_pair[1])) elif isinstance(agg_pair[1], list): for op in agg_pair[1]: caggregate_cols.push_back(col_idx) if isinstance(op, str): caggregate_ops.push_back(AggregationOpString[op]) + caggregate_ops_pair.push_back(pair[int, CGroupByAggregationOp](col_idx, + AggregationOpString[op])) elif isinstance(op, AggregationOp): caggregate_ops.push_back(op) + caggregate_ops_pair.push_back(pair[int, CGroupByAggregationOp](col_idx, op)) else: raise ValueError("Agg op must be either op name (str) or AggregationOp enum or " "a list of either of those") @@ -633,8 +645,14 @@ cdef class Table: raise ValueError("Index column must be either column name (str) or column " "index (int)") - status = DistributedHashGroupBy(self.table_shd_ptr, cindex_cols, caggregate_cols, - caggregate_ops, output) + if groupby_type.lower() == 'hash': + status = DistributedHashGroupBy(self.table_shd_ptr, cindex_cols, caggregate_cols, + caggregate_ops, output) + elif groupby_type.lower() == 'mapred_hash': + status = MapredHashGroupBy(self.table_shd_ptr, cindex_cols, caggregate_ops_pair, &output) + else: + raise Exception(f"Unknown groupby type {groupby_type}. Available [hash, mapred_hash]") + if status.is_ok(): return pycylon_wrap_table(output) else: diff --git a/python/pycylon/pycylon/frame.py b/python/pycylon/pycylon/frame.py index 57f106b90..1430dd818 100644 --- a/python/pycylon/pycylon/frame.py +++ b/python/pycylon/pycylon/frame.py @@ -123,14 +123,15 @@ def barrier(self): class GroupByDataFrame(object): - def __init__(self, df: DataFrame, by=None) -> None: + def __init__(self, df: DataFrame, by=None, groupby_type: str = 'hash') -> None: super().__init__() self.df = df self.by = by self.by_diff = set(df.columns) - set(by) + self.groupby_type = groupby_type def __do_groupby(self, op_dict) -> DataFrame: - return DataFrame(self.df.to_table().groupby(self.by, op_dict)) + return DataFrame(self.df.to_table().groupby(self.by, op_dict, groupby_type=self.groupby_type)) def __apply_on_remaining_columns(self, op: str) -> DataFrame: op_dict = {} @@ -2164,7 +2165,7 @@ def drop_duplicates(self, subset: Optional[Union[Hashable, Sequence[Hashable]]] return DataFrame(self._change_context(env)._table.distributed_unique(columns=subset, inplace=inplace)) - def groupby(self, by: Union[int, str, List], env: CylonEnv = None) -> GroupByDataFrame: + def groupby(self, by: Union[int, str, List], groupby_type="hash", env: CylonEnv = None) -> GroupByDataFrame: """ A groupby operation involves some combination of splitting the object, applying a function, and combining the results. This can be used to group large amounts of data and compute operations on these groups. @@ -2174,6 +2175,8 @@ def groupby(self, by: Union[int, str, List], env: CylonEnv = None) -> GroupByDat by : str, int or a list of str, int. List of column(s) used for grouping. + groupby_type: str, + Groupby Type - [hash, mapred_hash] default: hash Returns ------- @@ -2228,9 +2231,9 @@ def groupby(self, by: Union[int, str, List], env: CylonEnv = None) -> GroupByDat else: raise ValueError("Unknown value for by") if env is None: - return GroupByDataFrame(self, by_list) + return GroupByDataFrame(self, by_list, groupby_type=groupby_type) else: - return GroupByDataFrame(self._change_context(env), by_list) + return GroupByDataFrame(self._change_context(env), by_list, groupby_type=groupby_type) def isin(self, values: Union[List, Dict, cn.Table], skip_null: bool = True) -> DataFrame: """