Skip to content

Commit

Permalink
Mapreduce style group-by aggregators (#535)
Browse files Browse the repository at this point in the history
* mapred midway

* kernels done

* aggregates

* finalizing API

* adding support for single_stage_reduction

* minor api change

* adding group by perf script

* adding DistAggregateSingleStage impl

* minor fix

* minor changes

* rename method

* python bindings

* Update mapreduce.cpp

* adding more doc strings and minor refactors
  • Loading branch information
nirandaperera authored Dec 16, 2021
1 parent 50ef890 commit 3344bf9
Show file tree
Hide file tree
Showing 22 changed files with 1,787 additions and 217 deletions.
3 changes: 3 additions & 0 deletions cpp/src/cylon/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ add_library(cylon SHARED
arrow/arrow_types.hpp
column.cpp
column.hpp
compute/aggregate_kernels.cpp
compute/aggregate_kernels.hpp
compute/aggregate_utils.hpp
compute/aggregates.cpp
Expand Down Expand Up @@ -102,6 +103,8 @@ add_library(cylon SHARED
join/join_utils.hpp
join/sort_join.cpp
join/sort_join.hpp
mapreduce/mapreduce.hpp
mapreduce/mapreduce.cpp
net/channel.hpp
net/comm_operations.hpp
net/comm_type.hpp
Expand Down
26 changes: 22 additions & 4 deletions cpp/src/cylon/arrow/arrow_comparator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -781,6 +781,25 @@ Status TableRowIndexEqualTo::Make(const std::shared_ptr<arrow::Table> &table,
return Status::OK();
}

Status TableRowIndexEqualTo::Make(const std::vector<std::shared_ptr<arrow::Array>> &arrays,
std::unique_ptr<TableRowIndexEqualTo> *out_equal_to) {
auto comps = std::make_shared<std::vector<std::shared_ptr<ArrayIndexComparator>>>();
comps->reserve(arrays.size());

for (const auto &array: arrays) {
if (array->length() == 0) {
comps->emplace_back(std::make_shared<EmptyIndexComparator>());
} else {
std::unique_ptr<ArrayIndexComparator> comp;
RETURN_CYLON_STATUS_IF_FAILED(CreateArrayIndexComparator(array, &comp));
comps->emplace_back(std::move(comp));
}
}

*out_equal_to = std::make_unique<TableRowIndexEqualTo>(std::move(comps));
return Status::OK();
}

bool TableRowIndexEqualTo::operator()(const int64_t &record1, const int64_t &record2) const {
return std::all_of(comps->begin(), comps->end(),
[&](const std::shared_ptr<ArrayIndexComparator> &comp) {
Expand Down Expand Up @@ -839,10 +858,9 @@ Status TableRowIndexHash::Make(const std::shared_ptr<arrow::Table> &table,
Status TableRowIndexHash::Make(const std::vector<std::shared_ptr<arrow::Array>> &arrays,
std::unique_ptr<TableRowIndexHash> *hash) {
const int64_t len = arrays[0]->length();
if (std::all_of(arrays.begin() + 1, arrays.end(), [&](const std::shared_ptr<arrow::Array> &arr) {
return arr->length() == len;
})) {
return {Code::Invalid, "array lengths should be equal"};
if (std::any_of(arrays.begin() + 1, arrays.end(),
[&](const auto &arr) { return arr->length() != len; })) {
return {Code::Invalid, "TableRowIndexHash array lengths should be equal"};
}

auto hashes_ptr = std::make_shared<std::vector<uint32_t>>(len, 0);
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/cylon/arrow/arrow_comparator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ class TableRowIndexEqualTo {

static Status Make(const std::shared_ptr<arrow::Table> &table, const std::vector<int> &col_ids,
std::unique_ptr<TableRowIndexEqualTo> *out_equal_to);
static Status Make(const std::vector<std::shared_ptr<arrow::Array>> &arrays,
std::unique_ptr<TableRowIndexEqualTo> *out_equal_to);

private:
// this class gets copied to std container, so we don't want to copy these vectors.
Expand Down
38 changes: 38 additions & 0 deletions cpp/src/cylon/compute/aggregate_kernels.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include <memory>
#include "cylon/compute/aggregate_kernels.hpp"

namespace cylon {
namespace compute {

std::shared_ptr<AggregationOp> MakeAggregationOpFromID(AggregationOpId id) {
switch (id) {
case SUM:return std::make_shared<SumOp>();
case MIN:return std::make_shared<MinOp>();
case MAX:return std::make_shared<MaxOp>();
case COUNT:return std::make_shared<CountOp>();
case MEAN:return std::make_shared<MeanOp>();
case VAR:return std::make_shared<VarOp>();
case STDDEV:return std::make_shared<StdDevOp>();
case NUNIQUE:return std::make_shared<NUniqueOp>();
case QUANTILE:return std::make_shared<QuantileOp>();
}

return nullptr;
}

}
}
122 changes: 71 additions & 51 deletions cpp/src/cylon/compute/aggregate_kernels.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
#ifndef CYLON_CPP_SRC_CYLON_COMPUTE_AGGREGATE_KERNELS_HPP_
#define CYLON_CPP_SRC_CYLON_COMPUTE_AGGREGATE_KERNELS_HPP_

#include <algorithm>
#include <cmath>
#include <math.h>
#include <vector>
#include <unordered_set>

#include "cylon/util/macros.hpp"

namespace cylon {
namespace compute {

Expand Down Expand Up @@ -56,11 +59,6 @@ struct KernelOptions {
virtual ~KernelOptions() = default;
};

/**
* special kernel options holder for default options. i.e. no special options.
*/
struct DefaultKernelOptions : public KernelOptions {};

/**
* Variance kernel options
*/
Expand Down Expand Up @@ -89,36 +87,41 @@ struct QuantileKernelOptions : public KernelOptions {
/**
* 2. Aggregation operation
*/
struct AggregationOp {
/**
* @param id AggregationOpId
* @param options unique_ptr of options
*/
AggregationOp(AggregationOpId id, std::unique_ptr<KernelOptions> options)
: id(id), options(std::move(options)) {}
//struct AggregationOp {
// /**
// * @param id AggregationOpId
// * @param options unique_ptr of options
// */
// AggregationOp(AggregationOpId id, KernelOptions *options)
// : id(id), options(options) {}
//
// /**
// * Constructor with uninitialized options. This would be explicitly handled in impl
// * @param id
// */
// explicit AggregationOp(AggregationOpId id) : id(id), options(nullptr) {}
//
// virtual ~AggregationOp() = default;
//
// AggregationOpId id;
// KernelOptions* options;
//};

/**
* Constructor with uninitialized options. This would be explicitly handled in impl
* @param id
*/
explicit AggregationOp(AggregationOpId id) : id(id), options(nullptr) {}

virtual ~AggregationOp() = default;

AggregationOpId id;
std::unique_ptr<KernelOptions> options;
struct AggregationOp {
virtual AggregationOpId id() const = 0;
virtual KernelOptions *options() const { return nullptr; };
};

/**
* Base aggregation operation with DefaultKernelOptions
* Base aggregation operation with KernelOptions
* @tparam ID AggregationOpId
*/
template<AggregationOpId ID>
struct BaseAggregationOp : public AggregationOp {
BaseAggregationOp() : AggregationOp(ID, std::make_unique<DefaultKernelOptions>()) {}
AggregationOpId id() const override { return ID; }

static inline std::unique_ptr<AggregationOp> Make() {
return std::make_unique<BaseAggregationOp<ID>>();
static std::shared_ptr<AggregationOp> Make() {
return std::make_shared<BaseAggregationOp<ID>>();
}
};

Expand All @@ -136,40 +139,57 @@ struct NUniqueOp : public BaseAggregationOp<NUNIQUE> {};
* Var op
*/
struct VarOp : public AggregationOp {
std::shared_ptr<VarKernelOptions> opt;

/**
* @param ddof delta degree of freedom
*/
explicit VarOp(int ddof) : AggregationOp(VAR, std::make_unique<VarKernelOptions>(ddof)) {}
explicit VarOp(int ddof = 1) : opt(std::make_shared<VarKernelOptions>(ddof)) {}
explicit VarOp(const std::shared_ptr<KernelOptions> &opt)
: opt(std::static_pointer_cast<VarKernelOptions>(opt)) {}

static inline std::unique_ptr<AggregationOp> Make(int ddof = 0) {
return std::make_unique<VarOp>(ddof);
AggregationOpId id() const override { return VAR; }
KernelOptions *options() const override { return opt.get(); }

static std::shared_ptr<AggregationOp> Make(int ddof = 1) {
return std::make_shared<VarOp>(ddof);
}
};

struct StdDevOp : public AggregationOp {
explicit StdDevOp(int ddof) : AggregationOp(STDDEV, std::make_unique<VarKernelOptions>(ddof)) {}
struct StdDevOp : public VarOp {
explicit StdDevOp(int ddof = 1) : VarOp(ddof) {}
explicit StdDevOp(const std::shared_ptr<KernelOptions> &opt) : VarOp(opt) {}
AggregationOpId id() const override { return STDDEV; }

static inline std::unique_ptr<AggregationOp> Make(int ddof = 0) {
return std::make_unique<StdDevOp>(ddof);
static std::shared_ptr<AggregationOp> Make(int ddof = 1) {
return std::make_shared<StdDevOp>(ddof);
}
};

/**
* Var op
*/
struct QuantileOp : public AggregationOp {
std::shared_ptr<QuantileKernelOptions> opt;

/**
* @param quantile
*/
explicit QuantileOp(double quantile) : AggregationOp(QUANTILE,
std::make_unique<QuantileKernelOptions>(
quantile)) {}
explicit QuantileOp(double quantile = 0.5)
: opt(std::make_shared<QuantileKernelOptions>(quantile)) {}
explicit QuantileOp(const std::shared_ptr<KernelOptions> &opt)
: opt(std::static_pointer_cast<QuantileKernelOptions>(opt)) {}

static inline std::unique_ptr<AggregationOp> Make(double quantile = 0.5) {
return std::make_unique<QuantileOp>(quantile);
AggregationOpId id() const override { return QUANTILE; }
KernelOptions *options() const override { return opt.get(); }

static std::shared_ptr<AggregationOp> Make(double quantile = 0.5) {
return std::make_shared<QuantileOp>(quantile);
}
};

std::shared_ptr<AggregationOp> MakeAggregationOpFromID(AggregationOpId id);

// -----------------------------------------------------------------------------

/**
Expand All @@ -193,7 +213,7 @@ template<typename T>
struct KernelTraits<AggregationOpId::SUM, T> {
using State = std::tuple<T>; // <running sum>
using ResultT = T;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "sum_"; }

};
Expand All @@ -202,7 +222,7 @@ template<typename T>
struct KernelTraits<AggregationOpId::MEAN, T> {
using State = std::tuple<T, int64_t>; // <running sum, running count>
using ResultT = T;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "mean_"; }
};

Expand All @@ -226,31 +246,31 @@ template<typename T>
struct KernelTraits<AggregationOpId::COUNT, T> {
using State = std::tuple<int64_t>;
using ResultT = int64_t;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "count_"; }
};

template<typename T>
struct KernelTraits<AggregationOpId::MIN, T> {
using State = std::tuple<T>;
using ResultT = T;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "min_"; }
};

template<typename T>
struct KernelTraits<AggregationOpId::MAX, T> {
using State = std::tuple<T>;
using ResultT = T;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "max_"; }
};

template<typename T>
struct KernelTraits<AggregationOpId::NUNIQUE, T> {
using State = std::unordered_set<T>;
using ResultT = int64_t;
using Options = DefaultKernelOptions;
using Options = KernelOptions;
static constexpr const char *name() { return "nunique_"; }
};

Expand Down Expand Up @@ -350,7 +370,7 @@ struct TypedAggregationKernel : public AggregationKernel {
template<typename T>
class MeanKernel : public TypedAggregationKernel<MeanKernel<T>, MEAN, T> {
public:
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};

Expand Down Expand Up @@ -415,7 +435,7 @@ class VarianceKernel : public TypedAggregationKernel<VarianceKernel<T>, VAR, T>
*/
template<typename T>
struct SumKernel : public TypedAggregationKernel<SumKernel<T>, SUM, T> {
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};
inline void KernelInitializeState(std::tuple<T> *state) const {
Expand All @@ -434,7 +454,7 @@ struct SumKernel : public TypedAggregationKernel<SumKernel<T>, SUM, T> {
*/
template<typename T>
struct CountKernel : public TypedAggregationKernel<CountKernel<T>, COUNT, T> {
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};
inline void KernelInitializeState(std::tuple<int64_t> *state) const {
Expand All @@ -454,7 +474,7 @@ struct CountKernel : public TypedAggregationKernel<CountKernel<T>, COUNT, T> {
*/
template<typename T>
struct MinKernel : public TypedAggregationKernel<MinKernel<T>, MIN, T> {
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};
inline void KernelInitializeState(std::tuple<T> *state) const {
Expand All @@ -473,7 +493,7 @@ struct MinKernel : public TypedAggregationKernel<MinKernel<T>, MIN, T> {
*/
template<typename T>
struct MaxKernel : public TypedAggregationKernel<MaxKernel<T>, MAX, T> {
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};
inline void KernelInitializeState(std::tuple<T> *state) const {
Expand All @@ -489,7 +509,7 @@ struct MaxKernel : public TypedAggregationKernel<MaxKernel<T>, MAX, T> {

template<typename T>
struct NUniqueKernel : public TypedAggregationKernel<NUniqueKernel<T>, NUNIQUE, T> {
void KernelSetup(DefaultKernelOptions *options) {
void KernelSetup(KernelOptions *options) {
CYLON_UNUSED(options);
};
inline void KernelInitializeState(std::unordered_set<T> *state) const { CYLON_UNUSED(state); }
Expand Down
Loading

0 comments on commit 3344bf9

Please sign in to comment.