Skip to content

Commit

Permalink
Implement origin for pandas resampling (#1962)
Browse files Browse the repository at this point in the history
#### Reference Issues/PRs
<!--Example: Fixes #1234. See also #3456.-->
Resolve: #1740
Resolve: #1751 
 
#### What does this implement or fix?
Implements equivalent of pandas origin in ArcticDB's resample.

The default is set to `epoch` which matches the default behavior of
Arctic up to now but is different than Pandas which uses `start_day`.
Changing it to `start_day` is a breaking change.

Fix how min and max aggregators handle infinity when there are only
infinities in the column. Previously the result was `NaN` which is not
consistent with Pandas and wrong. Now they return the proper value of
infinity.

Add hypothesis test using combinations of all possible parameters for
resampling.

`start`, `end`, `start_day`, `end_day` are nut supported in conjunction
with `date_range` clause.

#### Any other comments?

#### Checklist

<details>
  <summary>
   Checklist for code changes...
  </summary>
 
- [ ] Have you updated the relevant docstrings, documentation and
copyright notice?
- [ ] Is this contribution tested against [all ArcticDB's
features](../docs/mkdocs/docs/technical/contributing.md)?
- [ ] Do all exceptions introduced raise appropriate [error
messages](https://docs.arcticdb.io/error_messages/)?
 - [ ] Are API changes highlighted in the PR description?
- [ ] Is the PR labelled as enhancement or bug so it appears in
autogenerated release notes?
</details>

<!--
Thanks for contributing a Pull Request to ArcticDB! Please ensure you
have taken a look at:
- ArcticDB's Code of Conduct:
https://github.com/man-group/ArcticDB/blob/master/CODE_OF_CONDUCT.md
- ArcticDB's Contribution Licensing:
https://github.com/man-group/ArcticDB/blob/master/docs/mkdocs/docs/technical/contributing.md#contribution-licensing
-->

---------

Co-authored-by: Vasil Pashov <[email protected]>
  • Loading branch information
vasil-pashov and Vasil Pashov authored Dec 19, 2024
1 parent f9d955f commit 6931d3f
Show file tree
Hide file tree
Showing 14 changed files with 694 additions and 175 deletions.
83 changes: 65 additions & 18 deletions cpp/arcticdb/processing/clause.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
#include <vector>
#include <variant>

#include <folly/Poly.h>

#include <arcticdb/processing/processing_unit.hpp>
#include <arcticdb/column_store/string_pool.hpp>
#include <arcticdb/util/offset_string.hpp>
Expand All @@ -21,9 +19,13 @@
#include <arcticdb/stream/segment_aggregator.hpp>
#include <arcticdb/util/test/random_throw.hpp>
#include <ankerl/unordered_dense.h>
#include <ranges>



namespace arcticdb {

namespace ranges = std::ranges;
using namespace pipelines;

class GroupingMap {
Expand Down Expand Up @@ -431,6 +433,53 @@ std::vector<EntityId> AggregationClause::process(std::vector<EntityId>&& entity_
return str_;
}

template<ResampleBoundary closed_boundary>
ResampleClause<closed_boundary>::ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin) :
rule_(std::move(rule)),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset),
origin_(std::move(origin)) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}

template<ResampleBoundary closed_boundary>
const ClauseInfo& ResampleClause<closed_boundary>::clause_info() const {
return clause_info_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = std::move(component_manager);
}

template<ResampleBoundary closed_boundary>
std::string ResampleClause<closed_boundary>::rule() const {
return rule_;
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_date_range(timestamp date_range_start, timestamp date_range_end) {
// Start and end need to read the first and last segments of the date range. At the moment buckets are set up before
// reading and processing the data.
constexpr static std::array unsupported_origin{ "start", "end", "start_day", "end_day" };
user_input::check<ErrorCode::E_INVALID_USER_ARGUMENT>(
util::variant_match(origin_,
[&](const std::string& origin) { return ranges::none_of(unsupported_origin, [&](std::string_view el) { return el == origin; }); },
[](const auto&) { return true;}
),
"Resampling origins {} are not supported in conjunction with date range", unsupported_origin
);
date_range_.emplace(date_range_start, date_range_end);
}

template<ResampleBoundary closed_boundary>
void ResampleClause<closed_boundary>::set_aggregations(const std::vector<NamedAggregator>& named_aggregators) {
clause_info_.input_columns_ = std::make_optional<std::unordered_set<std::string>>();
Expand Down Expand Up @@ -475,28 +524,27 @@ std::vector<std::vector<size_t>> ResampleClause<closed_boundary>::structure_for_
if (ranges_and_keys.empty()) {
return {};
}
TimestampRange index_range(
std::min_element(ranges_and_keys.begin(), ranges_and_keys.end(),
[](const RangesAndKey& left, const RangesAndKey& right) {
return left.start_time() < right.start_time();
})->start_time(),
std::max_element(ranges_and_keys.begin(), ranges_and_keys.end(),
[](const RangesAndKey& left, const RangesAndKey& right) {
return left.end_time() < right.end_time();
})->end_time()
);

// Iterate over ranges_and_keys and create a pair with first element equal to the smallest start time and second
// element equal to the largest end time.
const TimestampRange index_range = std::accumulate(
std::next(ranges_and_keys.begin()),
ranges_and_keys.end(),
TimestampRange{ ranges_and_keys.begin()->start_time(), ranges_and_keys.begin()->end_time() },
[](const TimestampRange& rng, const RangesAndKey& el) { return TimestampRange{std::min(rng.first, el.start_time()), std::max(rng.second, el.end_time())};});

if (date_range_.has_value()) {
date_range_->first = std::max(date_range_->first, index_range.first);
date_range_->second = std::min(date_range_->second, index_range.second);
} else {
date_range_ = index_range;
}

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
debug::check<ErrorCode::E_ASSERTION_FAILURE>(std::is_sorted(bucket_boundaries_.begin(), bucket_boundaries_.end()),
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ranges::is_sorted(bucket_boundaries_),
"Resampling expects provided bucket boundaries to be strictly monotonically increasing");
return structure_by_time_bucket<closed_boundary>(ranges_and_keys, bucket_boundaries_);
}
Expand All @@ -521,12 +569,11 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
}

date_range_ = std::make_optional<TimestampRange>(min_start_ts, max_end_ts);

bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_);
bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_);
if (bucket_boundaries_.size() < 2) {
return {};
}
debug::check<ErrorCode::E_ASSERTION_FAILURE>(std::is_sorted(bucket_boundaries_.begin(), bucket_boundaries_.end()),
debug::check<ErrorCode::E_ASSERTION_FAILURE>(ranges::is_sorted(bucket_boundaries_),
"Resampling expects provided bucket boundaries to be strictly monotonically increasing");

auto new_structure_offsets = structure_by_time_bucket<closed_boundary>(ranges_and_entities, bucket_boundaries_);
Expand All @@ -541,7 +588,7 @@ std::vector<std::vector<EntityId>> ResampleClause<closed_boundary>::structure_fo
}
}
internal::check<ErrorCode::E_ASSERTION_FAILURE>(
std::all_of(expected_fetch_counts.begin(), expected_fetch_counts.end(), [](EntityFetchCount fetch_count) {
ranges::all_of(expected_fetch_counts, [](EntityFetchCount fetch_count) {
return fetch_count == 1 || fetch_count == 2;
}),
"ResampleClause::structure_for_processing: invalid expected entity fetch count (should be 1 or 2)"
Expand Down
40 changes: 14 additions & 26 deletions cpp/arcticdb/processing/clause.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@

namespace arcticdb {

using ResampleOrigin = std::variant<std::string, timestamp>;

using RangesAndKey = pipelines::RangesAndKey;
using SliceAndKey = pipelines::SliceAndKey;

Expand Down Expand Up @@ -317,6 +319,7 @@ struct AggregationClause {

template<ResampleBoundary closed_boundary>
struct ResampleClause {
using BucketGeneratorT = std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, const ResampleOrigin&)>;
ClauseInfo clause_info_;
std::shared_ptr<ComponentManager> component_manager_;
ProcessingConfig processing_config_;
Expand All @@ -325,29 +328,22 @@ struct ResampleClause {
// This will contain the data range specified by the user (if any) intersected with the range of timestamps for the symbol
std::optional<TimestampRange> date_range_;
// Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)> generate_bucket_boundaries_;
BucketGeneratorT generate_bucket_boundaries_;
std::vector<timestamp> bucket_boundaries_;
std::vector<SortedAggregatorInterface> aggregators_;
std::string str_;
timestamp offset_;
ResampleOrigin origin_;

ResampleClause() = delete;

ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause)

ResampleClause(const std::string& rule,
ResampleBoundary label_boundary,
std::function<std::vector<timestamp>(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)>&& generate_bucket_boundaries,
timestamp offset):
rule_(rule),
label_boundary_(label_boundary),
generate_bucket_boundaries_(std::move(generate_bucket_boundaries)),
offset_(offset) {
clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED;
clause_info_.can_combine_with_column_selection_ = false;
clause_info_.modifies_output_descriptor_ = true;
clause_info_.index_ = KeepCurrentTopLevelIndex();
}
ResampleClause(std::string rule,
ResampleBoundary label_boundary,
BucketGeneratorT&& generate_bucket_boundaries,
timestamp offset,
ResampleOrigin origin);

[[nodiscard]] std::vector<std::vector<size_t>> structure_for_processing(
std::vector<RangesAndKey>& ranges_and_keys);
Expand All @@ -356,27 +352,19 @@ struct ResampleClause {

[[nodiscard]] std::vector<EntityId> process(std::vector<EntityId>&& entity_ids) const;

[[nodiscard]] const ClauseInfo& clause_info() const {
return clause_info_;
}
[[nodiscard]] const ClauseInfo& clause_info() const;

void set_processing_config(const ProcessingConfig& processing_config);

void set_component_manager(std::shared_ptr<ComponentManager> component_manager) {
component_manager_ = component_manager;
}
void set_component_manager(std::shared_ptr<ComponentManager> component_manager);

[[nodiscard]] std::string to_string() const;

[[nodiscard]] std::string rule() const {
return rule_;
}
[[nodiscard]] std::string rule() const;

void set_aggregations(const std::vector<NamedAggregator>& named_aggregators);

void set_date_range(timestamp date_range_start, timestamp date_range_end) {
date_range_.emplace(date_range_start, date_range_end);
}
void set_date_range(timestamp date_range_start, timestamp date_range_end);

std::vector<timestamp> generate_bucket_boundaries(timestamp first_ts,
timestamp last_ts,
Expand Down
72 changes: 70 additions & 2 deletions cpp/arcticdb/processing/clause_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
*/

#include <arcticdb/processing/clause_utils.hpp>
#include <ranges>


namespace arcticdb {

namespace arcticdb {
namespace ranges = std::ranges;
using namespace pipelines;

std::vector<std::vector<EntityId>> structure_by_row_slice(ComponentManager& component_manager, std::vector<std::vector<EntityId>>&& entity_ids_vec) {
Expand Down Expand Up @@ -101,9 +104,74 @@ std::shared_ptr<std::vector<EntityFetchCount>> generate_segment_fetch_counts(
}
}
debug::check<ErrorCode::E_ASSERTION_FAILURE>(
std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }),
ranges::none_of(*res, [](size_t val) { return val == 0; }),
"All segments should be needed by at least one ProcessingUnit");
return res;
}

template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{ *std::prev(bucket_boundaries_it), *bucket_boundaries_it };
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time() };
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}

template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndKey>(
std::vector<RangesAndKey>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::LEFT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);
template std::vector<std::vector<size_t>> structure_by_time_bucket<ResampleBoundary::RIGHT, RangesAndEntity>(
std::vector<RangesAndEntity>& ranges,
const std::vector<timestamp>& bucket_boundaries);

}
50 changes: 2 additions & 48 deletions cpp/arcticdb/processing/clause_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,54 +157,8 @@ void advance_boundary_past_value(const std::vector<timestamp>& bucket_boundaries
template<ResampleBoundary closed_boundary, typename T>
requires std::is_same_v<T, RangesAndKey> || std::is_same_v<T, RangesAndEntity>
std::vector<std::vector<size_t>> structure_by_time_bucket(
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries) {
std::erase_if(ranges, [&bucket_boundaries](const T &range) {
auto start_index = range.start_time();
auto end_index = range.end_time();
return index_range_outside_bucket_range<closed_boundary>(start_index, end_index, bucket_boundaries);
});
auto res = structure_by_row_slice(ranges);
// Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index
// value of row-slice i and the first value of row-slice i+1
// Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1
auto bucket_boundaries_it = std::cbegin(bucket_boundaries);
// Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit
for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) {
auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time();
advance_boundary_past_value<closed_boundary>(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice);
// bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice
if (bucket_boundaries_it != bucket_boundaries.end()) {
Bucket<closed_boundary> current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it};
auto next_row_slice_it = std::next(res_it);
while (next_row_slice_it != res.end()) {
// end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice
TimestampRange next_row_slice_timestamp_range{
ranges[next_row_slice_it->at(0)].start_time(),
ranges[next_row_slice_it->at(0)].end_time()};
if (current_bucket.contains(next_row_slice_timestamp_range.first)) {
// The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit
res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end());
if (current_bucket.contains(next_row_slice_timestamp_range.second)) {
// The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result
next_row_slice_it = res.erase(next_row_slice_it);
} else {
break;
}
} else {
break;
}
}
// This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest
if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) {
res.erase(next_row_slice_it, res.end());
break;
}
res_it = next_row_slice_it;
}
}
return res;
}
std::vector<T>& ranges,
const std::vector<timestamp>& bucket_boundaries);

std::vector<std::vector<EntityId>> structure_by_row_slice(ComponentManager& component_manager, std::vector<std::vector<EntityId>>&& entity_ids_vec);

Expand Down
Loading

0 comments on commit 6931d3f

Please sign in to comment.