diff --git a/cpp/arcticdb/processing/clause.cpp b/cpp/arcticdb/processing/clause.cpp index 82ce052969..308bb0d2f0 100644 --- a/cpp/arcticdb/processing/clause.cpp +++ b/cpp/arcticdb/processing/clause.cpp @@ -8,8 +8,6 @@ #include #include -#include - #include #include #include @@ -21,9 +19,13 @@ #include #include #include +#include + + namespace arcticdb { +namespace ranges = std::ranges; using namespace pipelines; class GroupingMap { @@ -431,6 +433,53 @@ std::vector AggregationClause::process(std::vector&& entity_ return str_; } +template +ResampleClause::ResampleClause(std::string rule, + ResampleBoundary label_boundary, + BucketGeneratorT&& generate_bucket_boundaries, + timestamp offset, + ResampleOrigin origin) : + rule_(std::move(rule)), + label_boundary_(label_boundary), + generate_bucket_boundaries_(std::move(generate_bucket_boundaries)), + offset_(offset), + origin_(std::move(origin)) { + clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED; + clause_info_.can_combine_with_column_selection_ = false; + clause_info_.modifies_output_descriptor_ = true; + clause_info_.index_ = KeepCurrentTopLevelIndex(); +} + +template +const ClauseInfo& ResampleClause::clause_info() const { + return clause_info_; +} + +template +void ResampleClause::set_component_manager(std::shared_ptr component_manager) { + component_manager_ = std::move(component_manager); +} + +template +std::string ResampleClause::rule() const { + return rule_; +} + +template +void ResampleClause::set_date_range(timestamp date_range_start, timestamp date_range_end) { + // Start and end need to read the first and last segments of the date range. At the moment buckets are set up before + // reading and processing the data. + constexpr static std::array unsupported_origin{ "start", "end", "start_day", "end_day" }; + user_input::check( + util::variant_match(origin_, + [&](const std::string& origin) { return ranges::none_of(unsupported_origin, [&](std::string_view el) { return el == origin; }); }, + [](const auto&) { return true;} + ), + "Resampling origins {} are not supported in conjunction with date range", unsupported_origin + ); + date_range_.emplace(date_range_start, date_range_end); +} + template void ResampleClause::set_aggregations(const std::vector& named_aggregators) { clause_info_.input_columns_ = std::make_optional>(); @@ -475,16 +524,15 @@ std::vector> ResampleClause::structure_for_ if (ranges_and_keys.empty()) { return {}; } - TimestampRange index_range( - std::min_element(ranges_and_keys.begin(), ranges_and_keys.end(), - [](const RangesAndKey& left, const RangesAndKey& right) { - return left.start_time() < right.start_time(); - })->start_time(), - std::max_element(ranges_and_keys.begin(), ranges_and_keys.end(), - [](const RangesAndKey& left, const RangesAndKey& right) { - return left.end_time() < right.end_time(); - })->end_time() - ); + + // Iterate over ranges_and_keys and create a pair with first element equal to the smallest start time and second + // element equal to the largest end time. + const TimestampRange index_range = std::accumulate( + std::next(ranges_and_keys.begin()), + ranges_and_keys.end(), + TimestampRange{ ranges_and_keys.begin()->start_time(), ranges_and_keys.begin()->end_time() }, + [](const TimestampRange& rng, const RangesAndKey& el) { return TimestampRange{std::min(rng.first, el.start_time()), std::max(rng.second, el.end_time())};}); + if (date_range_.has_value()) { date_range_->first = std::max(date_range_->first, index_range.first); date_range_->second = std::min(date_range_->second, index_range.second); @@ -492,11 +540,11 @@ std::vector> ResampleClause::structure_for_ date_range_ = index_range; } - bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_); + bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_); if (bucket_boundaries_.size() < 2) { return {}; } - debug::check(std::is_sorted(bucket_boundaries_.begin(), bucket_boundaries_.end()), + debug::check(ranges::is_sorted(bucket_boundaries_), "Resampling expects provided bucket boundaries to be strictly monotonically increasing"); return structure_by_time_bucket(ranges_and_keys, bucket_boundaries_); } @@ -521,12 +569,11 @@ std::vector> ResampleClause::structure_fo } date_range_ = std::make_optional(min_start_ts, max_end_ts); - - bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_); + bucket_boundaries_ = generate_bucket_boundaries_(date_range_->first, date_range_->second, rule_, closed_boundary, offset_, origin_); if (bucket_boundaries_.size() < 2) { return {}; } - debug::check(std::is_sorted(bucket_boundaries_.begin(), bucket_boundaries_.end()), + debug::check(ranges::is_sorted(bucket_boundaries_), "Resampling expects provided bucket boundaries to be strictly monotonically increasing"); auto new_structure_offsets = structure_by_time_bucket(ranges_and_entities, bucket_boundaries_); @@ -541,7 +588,7 @@ std::vector> ResampleClause::structure_fo } } internal::check( - std::all_of(expected_fetch_counts.begin(), expected_fetch_counts.end(), [](EntityFetchCount fetch_count) { + ranges::all_of(expected_fetch_counts, [](EntityFetchCount fetch_count) { return fetch_count == 1 || fetch_count == 2; }), "ResampleClause::structure_for_processing: invalid expected entity fetch count (should be 1 or 2)" diff --git a/cpp/arcticdb/processing/clause.hpp b/cpp/arcticdb/processing/clause.hpp index 901b622b79..134780d8db 100644 --- a/cpp/arcticdb/processing/clause.hpp +++ b/cpp/arcticdb/processing/clause.hpp @@ -36,6 +36,8 @@ namespace arcticdb { +using ResampleOrigin = std::variant; + using RangesAndKey = pipelines::RangesAndKey; using SliceAndKey = pipelines::SliceAndKey; @@ -317,6 +319,7 @@ struct AggregationClause { template struct ResampleClause { + using BucketGeneratorT = std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, const ResampleOrigin&)>; ClauseInfo clause_info_; std::shared_ptr component_manager_; ProcessingConfig processing_config_; @@ -325,29 +328,22 @@ struct ResampleClause { // This will contain the data range specified by the user (if any) intersected with the range of timestamps for the symbol std::optional date_range_; // Inject this as a callback in the ctor to avoid language-specific dependencies this low down in the codebase - std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)> generate_bucket_boundaries_; + BucketGeneratorT generate_bucket_boundaries_; std::vector bucket_boundaries_; std::vector aggregators_; std::string str_; timestamp offset_; + ResampleOrigin origin_; ResampleClause() = delete; ARCTICDB_MOVE_COPY_DEFAULT(ResampleClause) - ResampleClause(const std::string& rule, - ResampleBoundary label_boundary, - std::function(timestamp, timestamp, std::string_view, ResampleBoundary, timestamp)>&& generate_bucket_boundaries, - timestamp offset): - rule_(rule), - label_boundary_(label_boundary), - generate_bucket_boundaries_(std::move(generate_bucket_boundaries)), - offset_(offset) { - clause_info_.input_structure_ = ProcessingStructure::TIME_BUCKETED; - clause_info_.can_combine_with_column_selection_ = false; - clause_info_.modifies_output_descriptor_ = true; - clause_info_.index_ = KeepCurrentTopLevelIndex(); - } + ResampleClause(std::string rule, + ResampleBoundary label_boundary, + BucketGeneratorT&& generate_bucket_boundaries, + timestamp offset, + ResampleOrigin origin); [[nodiscard]] std::vector> structure_for_processing( std::vector& ranges_and_keys); @@ -356,27 +352,19 @@ struct ResampleClause { [[nodiscard]] std::vector process(std::vector&& entity_ids) const; - [[nodiscard]] const ClauseInfo& clause_info() const { - return clause_info_; - } + [[nodiscard]] const ClauseInfo& clause_info() const; void set_processing_config(const ProcessingConfig& processing_config); - void set_component_manager(std::shared_ptr component_manager) { - component_manager_ = component_manager; - } + void set_component_manager(std::shared_ptr component_manager); [[nodiscard]] std::string to_string() const; - [[nodiscard]] std::string rule() const { - return rule_; - } + [[nodiscard]] std::string rule() const; void set_aggregations(const std::vector& named_aggregators); - void set_date_range(timestamp date_range_start, timestamp date_range_end) { - date_range_.emplace(date_range_start, date_range_end); - } + void set_date_range(timestamp date_range_start, timestamp date_range_end); std::vector generate_bucket_boundaries(timestamp first_ts, timestamp last_ts, diff --git a/cpp/arcticdb/processing/clause_utils.cpp b/cpp/arcticdb/processing/clause_utils.cpp index 6b40a64e67..910dfa4c84 100644 --- a/cpp/arcticdb/processing/clause_utils.cpp +++ b/cpp/arcticdb/processing/clause_utils.cpp @@ -6,9 +6,12 @@ */ #include +#include + -namespace arcticdb { +namespace arcticdb { +namespace ranges = std::ranges; using namespace pipelines; std::vector> structure_by_row_slice(ComponentManager& component_manager, std::vector>&& entity_ids_vec) { @@ -101,9 +104,74 @@ std::shared_ptr> generate_segment_fetch_counts( } } debug::check( - std::all_of(res->begin(), res->end(), [](const size_t& val) { return val != 0; }), + ranges::none_of(*res, [](size_t val) { return val == 0; }), "All segments should be needed by at least one ProcessingUnit"); return res; } +template +requires std::is_same_v || std::is_same_v +std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries) { + std::erase_if(ranges, [&bucket_boundaries](const T &range) { + auto start_index = range.start_time(); + auto end_index = range.end_time(); + return index_range_outside_bucket_range(start_index, end_index, bucket_boundaries); + }); + auto res = structure_by_row_slice(ranges); + // Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index + // value of row-slice i and the first value of row-slice i+1 + // Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1 + auto bucket_boundaries_it = std::cbegin(bucket_boundaries); + // Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit + for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) { + auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time(); + advance_boundary_past_value(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice); + // bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice + if (bucket_boundaries_it != bucket_boundaries.end()) { + Bucket current_bucket{ *std::prev(bucket_boundaries_it), *bucket_boundaries_it }; + auto next_row_slice_it = std::next(res_it); + while (next_row_slice_it != res.end()) { + // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice + TimestampRange next_row_slice_timestamp_range{ + ranges[next_row_slice_it->at(0)].start_time(), + ranges[next_row_slice_it->at(0)].end_time() }; + if (current_bucket.contains(next_row_slice_timestamp_range.first)) { + // The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit + res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end()); + if (current_bucket.contains(next_row_slice_timestamp_range.second)) { + // The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result + next_row_slice_it = res.erase(next_row_slice_it); + } else { + break; + } + } else { + break; + } + } + // This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest + if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) { + res.erase(next_row_slice_it, res.end()); + break; + } + res_it = next_row_slice_it; + } + } + return res; +} + +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); +template std::vector> structure_by_time_bucket( + std::vector& ranges, + const std::vector& bucket_boundaries); + } diff --git a/cpp/arcticdb/processing/clause_utils.hpp b/cpp/arcticdb/processing/clause_utils.hpp index b0d7c044ec..7d0942c23d 100644 --- a/cpp/arcticdb/processing/clause_utils.hpp +++ b/cpp/arcticdb/processing/clause_utils.hpp @@ -157,54 +157,8 @@ void advance_boundary_past_value(const std::vector& bucket_boundaries template requires std::is_same_v || std::is_same_v std::vector> structure_by_time_bucket( - std::vector& ranges, - const std::vector& bucket_boundaries) { - std::erase_if(ranges, [&bucket_boundaries](const T &range) { - auto start_index = range.start_time(); - auto end_index = range.end_time(); - return index_range_outside_bucket_range(start_index, end_index, bucket_boundaries); - }); - auto res = structure_by_row_slice(ranges); - // Element i of res also needs the values from element i+1 if there is a bucket which incorporates the last index - // value of row-slice i and the first value of row-slice i+1 - // Element i+1 should be removed if the last bucket involved in element i covers all the index values in element i+1 - auto bucket_boundaries_it = std::cbegin(bucket_boundaries); - // Exit if res_it == std::prev(res.end()) as this implies the last row slice was not incorporated into an earlier processing unit - for (auto res_it = res.begin(); res_it != res.end() && res_it != std::prev(res.end());) { - auto last_index_value_in_row_slice = ranges[res_it->at(0)].end_time(); - advance_boundary_past_value(bucket_boundaries, bucket_boundaries_it, last_index_value_in_row_slice); - // bucket_boundaries_it now contains the end value of the last bucket covering the row-slice in res_it, or an end iterator if the last bucket ends before the end of this row-slice - if (bucket_boundaries_it != bucket_boundaries.end()) { - Bucket current_bucket{*std::prev(bucket_boundaries_it), *bucket_boundaries_it}; - auto next_row_slice_it = std::next(res_it); - while (next_row_slice_it != res.end()) { - // end_index from the key is 1 nanosecond larger than the index value of the last row in the row-slice - TimestampRange next_row_slice_timestamp_range{ - ranges[next_row_slice_it->at(0)].start_time(), - ranges[next_row_slice_it->at(0)].end_time()}; - if (current_bucket.contains(next_row_slice_timestamp_range.first)) { - // The last bucket in the current processing unit overlaps with the first index value in the next row slice, so add segments into current processing unit - res_it->insert(res_it->end(), next_row_slice_it->begin(), next_row_slice_it->end()); - if (current_bucket.contains(next_row_slice_timestamp_range.second)) { - // The last bucket in the current processing unit wholly contains the next row slice, so remove it from the result - next_row_slice_it = res.erase(next_row_slice_it); - } else { - break; - } - } else { - break; - } - } - // This is the last bucket, and all the required row-slices have been incorporated into the current processing unit, so erase the rest - if (bucket_boundaries_it == std::prev(bucket_boundaries.end())) { - res.erase(next_row_slice_it, res.end()); - break; - } - res_it = next_row_slice_it; - } - } - return res; -} + std::vector& ranges, + const std::vector& bucket_boundaries); std::vector> structure_by_row_slice(ComponentManager& component_manager, std::vector>&& entity_ids_vec); diff --git a/cpp/arcticdb/processing/sorted_aggregation.hpp b/cpp/arcticdb/processing/sorted_aggregation.hpp index 07ba30524a..440d58cfd7 100644 --- a/cpp/arcticdb/processing/sorted_aggregation.hpp +++ b/cpp/arcticdb/processing/sorted_aggregation.hpp @@ -160,7 +160,7 @@ class MinAggregatorSorted { void push(T value) { if constexpr (std::is_floating_point_v) { if (ARCTICDB_LIKELY(!std::isnan(value))) { - min_ = std::min(min_.value_or(std::numeric_limits::max()), value); + min_ = std::min(min_.value_or(std::numeric_limits::infinity()), value); } } else if constexpr (std::is_same_v && TimeType) { if (ARCTICDB_LIKELY(value != NaT)) { @@ -202,7 +202,7 @@ class MaxAggregatorSorted { void push(T value) { if constexpr (std::is_floating_point_v) { if (ARCTICDB_LIKELY(!std::isnan(value))) { - max_ = std::max(max_.value_or(std::numeric_limits::lowest()), value); + max_ = std::max(max_.value_or(-std::numeric_limits::infinity()), value); } } else if constexpr (std::is_same_v && TimeType) { if (ARCTICDB_LIKELY(value != NaT)) { diff --git a/cpp/arcticdb/processing/test/rapidcheck_resample.cpp b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp index 14285d7a91..9f7c3fe2b2 100644 --- a/cpp/arcticdb/processing/test/rapidcheck_resample.cpp +++ b/cpp/arcticdb/processing/test/rapidcheck_resample.cpp @@ -17,7 +17,7 @@ using namespace arcticdb; auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { - return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) mutable { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) mutable { return bucket_boundaries; }; } @@ -113,11 +113,11 @@ RC_GTEST_PROP(Resample, StructureForProcessing, ()) { } if (left_boundary_closed) { - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0}; auto result = resample_clause.structure_for_processing(ranges_and_keys); RC_ASSERT(expected_result == result); } else { - ResampleClause resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::RIGHT, generate_bucket_boundaries(std::move(bucket_boundaries)), 0, 0}; auto result = resample_clause.structure_for_processing(ranges_and_keys); RC_ASSERT(expected_result == result); } diff --git a/cpp/arcticdb/processing/test/test_resample.cpp b/cpp/arcticdb/processing/test/test_resample.cpp index c2bfa40043..4279540b01 100644 --- a/cpp/arcticdb/processing/test/test_resample.cpp +++ b/cpp/arcticdb/processing/test/test_resample.cpp @@ -14,7 +14,7 @@ using namespace arcticdb; auto generate_bucket_boundaries(std::vector&& bucket_boundaries) { - return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp) { + return [bucket_boundaries = std::move(bucket_boundaries)](timestamp, timestamp, std::string_view, ResampleBoundary, timestamp, ResampleOrigin) { return bucket_boundaries; }; } @@ -33,7 +33,7 @@ TEST(Resample, StructureForProcessingBasic) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0}; + ResampleClause resample_clause{ "dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0, 0 }; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -61,7 +61,7 @@ TEST(Resample, StructureForProcessingColumnSlicing) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{top_right, bottom_left, bottom_right, top_left}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 1500, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 4); ASSERT_EQ(ranges_and_keys[0], top_left); @@ -86,7 +86,7 @@ TEST(Resample, StructureForProcessingOverlap) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2500, 2999}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -113,7 +113,7 @@ TEST(Resample, StructureForProcessingSubsumed) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, middle, top}; - ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 4500}), 0}; + ResampleClause resample_clause{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 4500}), 0, 0}; auto proc_unit_ids = resample_clause.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 3); ASSERT_EQ(ranges_and_keys[0], top); @@ -138,7 +138,7 @@ TEST(Resample, StructureForProcessingExactBoundary) { // Insert into vector "out of order" to ensure structure_for_processing reorders correctly std::vector ranges_and_keys{bottom, top}; - ResampleClause resample_clause_left{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0}; + ResampleClause resample_clause_left{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0, 0}; auto proc_unit_ids = resample_clause_left.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -146,7 +146,7 @@ TEST(Resample, StructureForProcessingExactBoundary) { std::vector> expected_proc_unit_ids_left{{0}, {1}}; ASSERT_EQ(expected_proc_unit_ids_left, proc_unit_ids); - ResampleClause resample_clause_right{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0}; + ResampleClause resample_clause_right{"dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({1, 500, 2000, 2500, 2999}), 0, 0}; proc_unit_ids = resample_clause_right.structure_for_processing(ranges_and_keys); ASSERT_EQ(ranges_and_keys.size(), 2); ASSERT_EQ(ranges_and_keys[0], top); @@ -157,11 +157,11 @@ TEST(Resample, StructureForProcessingExactBoundary) { TEST(Resample, FindBuckets) { // Enough bucket boundaries to test all the interesting cases - ResampleClause resample_left("left", ResampleBoundary::LEFT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0); - ResampleClause resample_right("right", ResampleBoundary::RIGHT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0); + ResampleClause resample_left("left", ResampleBoundary::LEFT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0, 0); + ResampleClause resample_right("right", ResampleBoundary::RIGHT, generate_bucket_boundaries({0, 10, 20, 30, 40}), 0, 0); - resample_left.bucket_boundaries_ = resample_left.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); - resample_right.bucket_boundaries_ = resample_right.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::RIGHT, 0); + resample_left.bucket_boundaries_ = resample_left.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); + resample_right.bucket_boundaries_ = resample_right.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::RIGHT, 0, 0); std::vector res; @@ -221,8 +221,8 @@ TEST(Resample, FindBuckets) { TEST(Resample, ProcessOneSegment) { auto component_manager = std::make_shared(); - ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-1, 2, 5}), 0); - resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-1, 2, 5}), 0, 0); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); resample.date_range_ = {0, 5}; resample.set_component_manager(component_manager); resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); @@ -266,8 +266,8 @@ TEST(Resample, ProcessOneSegment) { TEST(Resample, ProcessMultipleSegments) { auto component_manager = std::make_shared(); - ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-15, -5, 5, 6, 25, 35, 45, 46, 55, 65}), 0); - resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0); + ResampleClause resample("dummy", ResampleBoundary::LEFT, generate_bucket_boundaries({-15, -5, 5, 6, 25, 35, 45, 46, 55, 65}), 0, 0); + resample.bucket_boundaries_ = resample.generate_bucket_boundaries_(0, 0, "dummy", ResampleBoundary::LEFT, 0, 0); resample.date_range_ = {0, 51}; resample.set_component_manager(component_manager); resample.set_aggregations({{"sum", "sum_column", "sum_column"}}); diff --git a/cpp/arcticdb/python/python_utils.hpp b/cpp/arcticdb/python/python_utils.hpp index 10e33c6838..4c4ea638f0 100644 --- a/cpp/arcticdb/python/python_utils.hpp +++ b/cpp/arcticdb/python/python_utils.hpp @@ -8,7 +8,6 @@ #pragma once #include -#include #include #include #include diff --git a/cpp/arcticdb/version/python_bindings.cpp b/cpp/arcticdb/version/python_bindings.cpp index bb8c8518cb..b839159843 100644 --- a/cpp/arcticdb/version/python_bindings.cpp +++ b/cpp/arcticdb/version/python_bindings.cpp @@ -5,7 +5,7 @@ * As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. */ -#include +#include #include #include #include @@ -14,7 +14,6 @@ #include #include #include -#include #include #include #include @@ -23,27 +22,81 @@ #include #include #include +#include namespace arcticdb::version_store { +static consteval timestamp one_day_in_nanoseconds() { + return timestamp(24) * 60 * 60 * 1'000'000'000; +} + +template +requires std::integral +[[nodiscard]] static T python_mod(T a, T b) { + return (a % b + b) % b; +} + +/// @param ts in nanoseconds +[[nodiscard]] static timestamp start_of_day_nanoseconds(timestamp ts) { + return ts - python_mod(ts, one_day_in_nanoseconds()); +} + +/// @param ts in nanoseconds +[[nodiscard]] static timestamp end_of_day_nanoseconds(timestamp ts) { + const timestamp start_of_day = start_of_day_nanoseconds(ts); + const bool is_midnnight = start_of_day == ts; + if (is_midnnight) { + return ts; + } + return start_of_day + one_day_in_nanoseconds(); +} + [[nodiscard]] static std::pair compute_first_last_dates( timestamp start, timestamp end, timestamp rule, ResampleBoundary closed_boundary_arg, - timestamp offset + timestamp offset, + const ResampleOrigin& origin ) { - const timestamp ns_to_prev_offset_start = (start - offset) % rule; - const timestamp ns_to_prev_offset_end = (end - offset) % rule; + // Origin value formula from Pandas: + // https://github.com/pandas-dev/pandas/blob/68d9dcab5b543adb3bfe5b83563c61a9b8afae77/pandas/core/resample.py#L2564 + auto [origin_ns, origin_adjusted_start] = util::variant_match( + origin, + [start](timestamp o) -> std::pair {return {o, start}; }, + [&](const std::string& o) -> std::pair { + if (o == "epoch") { + return { 0, start }; + } else if (o == "start") { + return { start, start }; + } else if (o == "start_day") { + return { start_of_day_nanoseconds(start), start }; + } else if (o == "end_day" || o == "end") { + const timestamp origin_last = o == "end" ? end: end_of_day_nanoseconds(end); + const timestamp bucket_count = (origin_last - start) / rule + (closed_boundary_arg == ResampleBoundary::LEFT); + const timestamp origin_ns = origin_last - bucket_count * rule; + return { origin_ns, origin_ns }; + } else { + user_input::raise( + "Invalid origin value {}. Supported values are: \"start\", \"start_day\", \"end\", \"end_day\", \"epoch\" or timestamp in nanoseconds", + o); + } + } + ); + origin_ns += offset; + + const timestamp ns_to_prev_offset_start = python_mod(origin_adjusted_start - origin_ns, rule); + const timestamp ns_to_prev_offset_end = python_mod(end - origin_ns, rule); + if (closed_boundary_arg == ResampleBoundary::RIGHT) { return { - ns_to_prev_offset_start > 0 ? start - ns_to_prev_offset_start : start - rule, + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start - rule, ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end }; } else { return { - ns_to_prev_offset_start > 0 ? start - ns_to_prev_offset_start : start, + ns_to_prev_offset_start > 0 ? origin_adjusted_start - ns_to_prev_offset_start : origin_adjusted_start, ns_to_prev_offset_end > 0 ? end + (rule - ns_to_prev_offset_end) : end + rule }; } @@ -54,14 +107,14 @@ std::vector generate_buckets( timestamp end, std::string_view rule, ResampleBoundary closed_boundary_arg, - timestamp offset + timestamp offset, + const ResampleOrigin& origin ) { - timestamp rule_ns; - { + const timestamp rule_ns = [](std::string_view rule) { py::gil_scoped_acquire acquire_gil; - rule_ns = python_util::pd_to_offset(rule); - } - const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset); + return python_util::pd_to_offset(rule); + }(rule); + const auto [start_with_offset, end_with_offset] = compute_first_last_dates(start, end, rule_ns, closed_boundary_arg, offset, origin); const auto bucket_boundary_count = (end_with_offset - start_with_offset) / rule_ns + 1; std::vector res; res.reserve(bucket_boundary_count); @@ -73,16 +126,10 @@ std::vector generate_buckets( template void declare_resample_clause(py::module& version) { - std::string class_name; - if constexpr (closed_boundary == ResampleBoundary::LEFT) { - class_name = "ResampleClauseLeftClosed"; - } else { - // closed_boundary == ResampleBoundary::RIGHT - class_name = "ResampleClauseRightClosed"; - } - py::class_, std::shared_ptr>>(version, class_name.c_str()) - .def(py::init([](std::string rule, ResampleBoundary label_boundary, timestamp offset){ - return ResampleClause(rule, label_boundary, generate_buckets, offset); + const char* class_name = closed_boundary == ResampleBoundary::LEFT ? "ResampleClauseLeftClosed" : "ResampleClauseRightClosed"; + py::class_, std::shared_ptr>>(version, class_name) + .def(py::init([](std::string rule, ResampleBoundary label_boundary, timestamp offset, ResampleOrigin origin){ + return ResampleClause(std::move(rule), label_boundary, generate_buckets, offset, std::move(origin)); })) .def_property_readonly("rule", &ResampleClause::rule) .def("set_aggregations", [](ResampleClause& self, diff --git a/python/arcticdb/util/test.py b/python/arcticdb/util/test.py index 61440572a6..fdc922665c 100644 --- a/python/arcticdb/util/test.py +++ b/python/arcticdb/util/test.py @@ -35,6 +35,7 @@ from arcticdb.version_store.helper import ArcticFileConfig from arcticdb.config import _DEFAULT_ENVS_PATH from arcticdb_ext import set_config_int, get_config_int, unset_config_int +from packaging.version import Version from arcticdb import log @@ -756,3 +757,80 @@ def generic_named_aggregation_test(lib, symbol, df, grouping_column, aggs_dict): f"""\nPandas result:\n{expected}\n"ArcticDB result:\n{received}""" ) raise e + +def drop_inf_and_nan(df: pd.DataFrame) -> pd.DataFrame: + return df[~df.isin([np.nan, np.inf, -np.inf]).any(axis=1)] + + +def assert_dfs_approximate(left: pd.DataFrame, right: pd.DataFrame): + """ + Checks if integer columns are exactly the same. For float columns checks if they are approximately the same. + We can't guarantee the same order of operations for the floats thus numerical errors might appear. + """ + assert left.shape == right.shape + assert left.columns.equals(right.columns) + # To avoid checking the freq member of the index as arctic does not fill it in + assert left.index.equals(right.index) + + # Drop NaN an inf values because. Pandas uses Kahan summation algorithm to improve numerical stability. + # Thus they don't consistently overflow to infinity. Discussion: https://github.com/pandas-dev/pandas/issues/60303 + left_no_inf_and_nan = drop_inf_and_nan(left) + right_no_inf_and_nan = drop_inf_and_nan(right) + + check_equals_flags = {"check_dtype": False} + if PANDAS_VERSION >= Version("1.1"): + check_equals_flags["check_freq"] = False + if PANDAS_VERSION >= Version("1.2"): + check_equals_flags["check_flags"] = False + for col in left_no_inf_and_nan.columns: + if pd.api.types.is_integer_dtype(left_no_inf_and_nan[col].dtype) and pd.api.types.is_integer_dtype(right_no_inf_and_nan[col].dtype): + pd.testing.assert_series_equal(left_no_inf_and_nan[col], right_no_inf_and_nan[col], **check_equals_flags) + else: + if PANDAS_VERSION >= Version("1.1"): + check_equals_flags["atol"] = 1e-8 + pd.testing.assert_series_equal(left_no_inf_and_nan[col], right_no_inf_and_nan[col], **check_equals_flags) + + +def generic_resample_test(lib, sym, rule, aggregations, date_range=None, closed=None, label=None, offset=None, origin=None, drop_empty_buckets_for=None): + """ + Perform a resampling in ArcticDB and compare it against the same query in Pandas. + + :param drop_empty_buckets_for: Will add additional aggregation column using the count aggregator. At the end of the + aggregation query will remove all rows for which this newly added count aggregation is 0. Works only for int/uint + columns. There is similar function generic_resample_test_with_empty_buckets in + python/tests/unit/arcticdb/version_store/test_resample.py which can drop empty buckets for all types of columns, + but it cannot take parameters such as origin and offset. + """ + # Pandas doesn't have a good date_range equivalent in resample, so just use read for that + expected = lib.read(sym, date_range=date_range).data + # Pandas 1.X needs None as the first argument to agg with named aggregators + + pandas_aggregations = {**aggregations, "_bucket_size_": (drop_empty_buckets_for, "count")} if drop_empty_buckets_for else aggregations + resample_args = {} + if origin: + resample_args['origin'] = origin + if offset: + resample_args['offset'] = offset + + if PANDAS_VERSION >= Version("1.1.0"): + expected = expected.resample(rule, closed=closed, label=label, **resample_args).agg(None, **pandas_aggregations) + else: + expected = expected.resample(rule, closed=closed, label=label).agg(None, **pandas_aggregations) + if drop_empty_buckets_for: + expected = expected[expected["_bucket_size_"] > 0] + expected.drop(columns=["_bucket_size_"], inplace=True) + expected = expected.reindex(columns=sorted(expected.columns)) + + q = QueryBuilder() + if origin: + q = q.resample(rule, closed=closed, label=label, offset=offset, origin=origin).agg(aggregations) + else: + q = q.resample(rule, closed=closed, label=label, offset=offset).agg(aggregations) + received = lib.read(sym, date_range=date_range, query_builder=q).data + received = received.reindex(columns=sorted(received.columns)) + + has_float_column = any(pd.api.types.is_float_dtype(col_type) for col_type in list(expected.dtypes)) + if has_float_column: + assert_dfs_approximate(expected, received) + else: + assert_frame_equal(expected, received, check_dtype=False) \ No newline at end of file diff --git a/python/arcticdb/version_store/processing.py b/python/arcticdb/version_store/processing.py index 46a821d5fe..5f77cda66d 100644 --- a/python/arcticdb/version_store/processing.py +++ b/python/arcticdb/version_store/processing.py @@ -320,6 +320,7 @@ class PythonResampleClause: aggregations: Dict[str, Union[str, Tuple[str, str]]] = None # In nanosecods offset: int = 0 + origin: Union[str, pd.Timestamp] = "epoch" class QueryBuilder: @@ -590,7 +591,8 @@ def resample( rule: Union[str, pd.DateOffset], closed: Optional[str] = None, label: Optional[str] = None, - offset: Optional[Union[str, pd.Timedelta]] = None + offset: Optional[Union[str, pd.Timedelta]] = None, + origin: Union[str, pd.Timestamp] = 'epoch' ): """ Resample a symbol on the index. The symbol must be datetime indexed. Resample operations must be followed by @@ -637,7 +639,16 @@ def resample( offset: Optional[Union[str, pd.Timedelta]] default=None Offset the start of each bucket. Supported strings are the same as in `pd.Timedelta`. If offset is larger than rule then `offset` modulo `rule` is used as an offset. + origin: Optional[Union[str, pd.Timestamp]] default='epoch' + The timestamp on which to adjust the grouping. Supported string are: + * epoch: origin is 1970-01-01 + * start: origin is the first value of the timeseries + * start_day: origin is the first day at midnight of the timeseries + * end: origin is the last value of the timeseries + * end_day: origin is the ceiling midnight of the last day + + `start`, `start_day`, `end`, `end_day` origin values are not supported in conjunction with `date_range`. Returns ------- QueryBuilder @@ -658,6 +669,10 @@ def resample( * The library has dynamic schema enabled, and at least one of the columns being aggregated is missing from at least one row-slice. * At least one of the columns being aggregated contains sparse data. + UserInputException + + * `start`, `start_day`, `end`, `end_day` is used in conjunction with `date_range` + * `origin` is not one of `start`, `start_day`, `end`, `end_day`, `epoch` or a `pd.Timestamp` Examples -------- @@ -742,6 +757,10 @@ def resample( else: offset_ns = 0 + if not (isinstance(origin, pd.Timestamp) or origin in ["start", "end", "start_day", "end_day", "epoch"]): + raise UserInputException(f'Argument origin must be either of type pd.Timestamp or one of ["start", "end", "start_day", "end_day", "epoch"]. Got {offset} instead') + if type(origin) is pd.Timestamp: + origin = origin.value # This set is documented here: # https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.Series.resample.html#pandas.Series.resample # and lifted directly from pandas.core.resample.TimeGrouper.__init__, and so is inherently fragile to upstream @@ -750,15 +769,15 @@ def resample( boundary_map = { "left": _ResampleBoundary.LEFT, "right": _ResampleBoundary.RIGHT, - None: _ResampleBoundary.RIGHT if rule in end_types else _ResampleBoundary.LEFT + None: _ResampleBoundary.RIGHT if rule in end_types or origin in ["end", "end_day"] else _ResampleBoundary.LEFT } check(closed in boundary_map.keys(), f"closed kwarg to resample must be `left`, 'right', or None, but received '{closed}'") check(label in boundary_map.keys(), f"label kwarg to resample must be `left`, 'right', or None, but received '{closed}'") if boundary_map[closed] == _ResampleBoundary.LEFT: - self.clauses = self.clauses + [_ResampleClauseLeftClosed(rule, boundary_map[label], offset_ns)] + self.clauses = self.clauses + [_ResampleClauseLeftClosed(rule, boundary_map[label], offset_ns, origin)] else: - self.clauses = self.clauses + [_ResampleClauseRightClosed(rule, boundary_map[label], offset_ns)] - self._python_clauses = self._python_clauses + [PythonResampleClause(rule=rule, closed=boundary_map[closed], label=boundary_map[label], offset=offset_ns)] + self.clauses = self.clauses + [_ResampleClauseRightClosed(rule, boundary_map[label], offset_ns, origin)] + self._python_clauses = self._python_clauses + [PythonResampleClause(rule=rule, closed=boundary_map[closed], label=boundary_map[label], offset=offset_ns, origin=origin)] return self @@ -936,9 +955,9 @@ def __setstate__(self, state): self.clauses = self.clauses + [_AggregationClause(self.clauses[-1].grouping_column, python_clause.aggregations)] elif isinstance(python_clause, PythonResampleClause): if python_clause.closed == _ResampleBoundary.LEFT: - self.clauses = self.clauses + [_ResampleClauseLeftClosed(python_clause.rule, python_clause.label, python_clause.offset)] + self.clauses = self.clauses + [_ResampleClauseLeftClosed(python_clause.rule, python_clause.label, python_clause.offset, python_clause.origin)] else: - self.clauses = self.clauses + [_ResampleClauseRightClosed(python_clause.rule, python_clause.label, python_clause.offset)] + self.clauses = self.clauses + [_ResampleClauseRightClosed(python_clause.rule, python_clause.label, python_clause.offset, python_clause.origin)] if python_clause.aggregations is not None: self.clauses[-1].set_aggregations(python_clause.aggregations) elif isinstance(python_clause, PythonRowRangeClause): diff --git a/python/tests/hypothesis/arcticdb/test_resample.py b/python/tests/hypothesis/arcticdb/test_resample.py new file mode 100644 index 0000000000..e263211e92 --- /dev/null +++ b/python/tests/hypothesis/arcticdb/test_resample.py @@ -0,0 +1,110 @@ +import pandas as pd +import numpy as np +import pytest +from hypothesis import given, settings, assume +import hypothesis.extra.pandas as hs_pd +import hypothesis.extra.numpy as hs_np +import hypothesis.strategies as st +from arcticdb.util.hypothesis import use_of_function_scoped_fixtures_in_hypothesis_checked +from arcticdb import QueryBuilder +from arcticdb.util.test import assert_frame_equal, generic_resample_test +from arcticdb.util._versions import IS_PANDAS_TWO + + +COLUMN_DTYPE = ["float", "int", "uint"] +ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] +MIN_DATE = np.datetime64('1969-01-01') +MAX_DATE = np.datetime64('2000-01-01') + +@st.composite +def date(draw, min_date, max_date): + # Bound the start and end date so that we don't end up with too many buckets eating all RAM + # Use some pre-epoch dates. + # hs_np.from_dtype's min_value and max_value do not work with dates + res = draw(hs_np.from_dtype(np.dtype("datetime64[ns]"))) + assume(min_date <= res and res <= max_date) + return res + +@st.composite +def dataframe(draw): + index = hs_pd.indexes(elements=date(min_date=MIN_DATE, max_date=MAX_DATE).filter(lambda d: d is not pd.NaT), min_size=1) + columns = [hs_pd.column(name=f"col_{dtype}", dtype=dtype) for dtype in COLUMN_DTYPE] + result = draw(hs_pd.data_frames(columns, index=index)) + result.sort_index(inplace=True) + return result + +@st.composite +def origin(draw): + selected_origin = draw(st.sampled_from(["start", "end", "start_day", "end_day", "epoch", "timestamp"])) + # Hypothesis may generate dates for year > 2200 and some of the arithmetic operation will overflow. + if selected_origin == "timestamp": + min_date = MIN_DATE - np.timedelta64(365, 'D') + max_date = MAX_DATE + np.timedelta64(365, 'D') + return pd.Timestamp(draw(date(min_date=min_date, max_date=max_date))) + else: + return selected_origin + +def freq_fits_in_64_bits(count, unit): + """ + This is used to check if a frequency is usable by Arctic. ArcticDB converts the frequency to signed 64 bit integer. + """ + billion = 1_000_000_000 + mult = {'h': 3600 * billion, 'min': 60 * billion, 's': billion} + return (mult[unit] * count).bit_length() <= 63 + +@st.composite +def rule(draw): + count = draw(st.integers(min_value=1)) + unit = draw(st.sampled_from(['min', 'h'])) + result = f"{count}{unit}" + assume(freq_fits_in_64_bits(count=count, unit=unit)) + return result + +@st.composite +def offset(draw): + unit = draw(st.sampled_from(['s', 'min', 'h', None])) + if unit is None: + return None + count = draw(st.integers(min_value=1)) + result = f"{count}{unit}" + assume(freq_fits_in_64_bits(count=count, unit=unit)) + return result + + +@pytest.mark.skipif(not IS_PANDAS_TWO, reason="Some resampling parameters don't exist in Pandas < 2") +@use_of_function_scoped_fixtures_in_hypothesis_checked +@settings(deadline=None) +@given( + df=dataframe(), + rule=rule(), + origin=origin(), + offset=offset() +) +def test_resample(lmdb_version_store_v1, df, rule, origin, offset): + lib = lmdb_version_store_v1 + sym = "sym" + lib.write(sym, df) + for closed in ["left", "right"]: + for label in ["left", "right"]: + columns = list(df.columns) + agg = {f"{name}_{op}": (name, op) for name in columns for op in ALL_AGGREGATIONS} + try: + generic_resample_test( + lib, + sym, + rule, + agg, + origin=origin, + offset=offset, + closed=closed, + label=label, + # Must be int or uint column otherwise dropping of empty buckets will not work + drop_empty_buckets_for="col_uint") + except ValueError as pandas_error: + # This is to avoid a bug in pandas related to how end an end_day work. It's possible that when end/end_day are used + # the first value of the data frame to be outside of the computed resampling range. In arctic this is not a problem + # as we allow this by design. + if str(pandas_error) != "Values falls before first bin": + raise pandas_error + else: + return \ No newline at end of file diff --git a/python/tests/hypothesis/arcticdb/test_sort_merge.py b/python/tests/hypothesis/arcticdb/test_sort_merge.py index d983ca97dd..2417d1bf08 100644 --- a/python/tests/hypothesis/arcticdb/test_sort_merge.py +++ b/python/tests/hypothesis/arcticdb/test_sort_merge.py @@ -38,7 +38,7 @@ def generate_single_dataframe(draw, column_list, min_size=0, allow_nat_in_index= if not IS_PANDAS_TWO: # Due to https://github.com/man-group/ArcticDB/blob/7479c0b0caa8121bc2ca71a73e29769bbc41c66a/python/arcticdb/version_store/_normalization.py#L184 # we change the dtype of empty float columns. This makes hypothesis tests extremely hard to write as we must - # keep addional state about is there a mix of empty/non-empty float columns in the staging area, did we write + # keep additional state about is there a mix of empty/non-empty float columns in the staging area, did we write # empty float column (if so it's type would be object). These edge cases are covered in the unit tests. index = hs_pd.indexes(dtype="datetime64[ns]", min_size=1 if min_size <= 0 else min_size).filter(lambda x: allow_nat_in_index or not pd.NaT in x) else: diff --git a/python/tests/unit/arcticdb/version_store/test_resample.py b/python/tests/unit/arcticdb/version_store/test_resample.py index 965f679340..6b41090f87 100644 --- a/python/tests/unit/arcticdb/version_store/test_resample.py +++ b/python/tests/unit/arcticdb/version_store/test_resample.py @@ -12,16 +12,20 @@ import pytest from arcticdb import QueryBuilder -from arcticdb.exceptions import ArcticDbNotYetImplemented, SchemaException -from arcticdb.util.test import assert_frame_equal +from arcticdb.exceptions import ArcticDbNotYetImplemented, SchemaException, UserInputException +from arcticdb.util.test import assert_frame_equal, generic_resample_test from packaging.version import Version from arcticdb.util._versions import IS_PANDAS_TWO, PANDAS_VERSION +import itertools pytestmark = pytest.mark.pipeline ALL_AGGREGATIONS = ["sum", "mean", "min", "max", "first", "last", "count"] +def all_aggregations_dict(col): + return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} + # Pandas recommended way to resample and exclude buckets with no index values, which is our behaviour # See https://pandas.pydata.org/pandas-docs/stable/user_guide/timeseries.html#sparse-resampling def round(t, freq): @@ -30,31 +34,22 @@ def round(t, freq): return pd.Timestamp((t.value // td.value) * td.value) def generic_resample_test_with_empty_buckets(lib, sym, rule, aggregations, date_range=None): - # Pandas doesn't have a good date_range equivalent in resample, so just use read for that - expected = lib.read(sym, date_range=date_range).data - # Pandas 1.X needs None as the first argument to agg with named aggregators - expected = expected.groupby(partial(round, freq=rule)).agg(None, **aggregations) - expected = expected.reindex(columns=sorted(expected.columns)) - - q = QueryBuilder() - q = q.resample(rule).agg(aggregations) - received = lib.read(sym, date_range=date_range, query_builder=q).data - received = received.reindex(columns=sorted(received.columns)) + """ + Perform a resampling in ArcticDB and compare it against the same query in Pandas. - assert_frame_equal(expected, received, check_dtype=False) + This will remove all empty buckets mirroring ArcticDB's behavior. It cannot take additional parameters such as + orign and offset. In case such parameters are needed arcticdb.util.test.generic_resample_test can be used. -def generic_resample_test(lib, sym, rule, aggregations, date_range=None, closed=None, label=None, offset=None): + This can drop buckets even all columns are of float type while generic_resample_test needs at least one non-float + column. + """ # Pandas doesn't have a good date_range equivalent in resample, so just use read for that expected = lib.read(sym, date_range=date_range).data # Pandas 1.X needs None as the first argument to agg with named aggregators - if PANDAS_VERSION >= Version("1.1.0"): - expected = expected.resample(rule, closed=closed, label=label, offset=offset).agg(None, **aggregations) - else: - expected = expected.resample(rule, closed=closed, label=label).agg(None, **aggregations) + expected = expected.groupby(partial(round, freq=rule)).agg(None, **aggregations) expected = expected.reindex(columns=sorted(expected.columns)) - q = QueryBuilder() - q = q.resample(rule, closed=closed, label=label, offset=offset).agg(aggregations) + q = q.resample(rule).agg(aggregations) received = lib.read(sym, date_range=date_range, query_builder=q).data received = received.reindex(columns=sorted(received.columns)) @@ -548,10 +543,6 @@ def test_resampling_empty_type_column(lmdb_version_store_empty_types_v1): @pytest.mark.parametrize("closed", ["left", "right"]) class TestResamplingOffset: - @staticmethod - def all_aggregations_dict(col): - return {f"to_{agg}": (col, agg) for agg in ALL_AGGREGATIONS} - @pytest.mark.parametrize("offset", ("30s", pd.Timedelta(seconds=30))) def test_offset_smaller_than_freq(self, lmdb_version_store_v1, closed, offset): lib = lmdb_version_store_v1 @@ -564,7 +555,7 @@ def test_offset_smaller_than_freq(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset="30s" ) @@ -581,7 +572,7 @@ def test_offset_larger_than_freq(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset ) @@ -603,7 +594,7 @@ def test_values_on_offset_boundary(self, lmdb_version_store_v1, closed, offset): lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset ) @@ -630,8 +621,226 @@ def test_with_date_range(self, lmdb_version_store_v1, closed, date_range, offset lib, sym, "2min", - self.all_aggregations_dict("col"), + all_aggregations_dict("col"), closed=closed, offset=offset, date_range=date_range - ) \ No newline at end of file + ) + +@pytest.mark.skipif(PANDAS_VERSION < Version("1.1.0"), reason="Pandas < 1.1.0 do not have offset param") +@pytest.mark.parametrize("closed", ["left", "right"]) +class TestResamplingOrigin: + + # Timestamps: pre start, between start and end, post end, first date in the index, last date in the index + @pytest.mark.parametrize( + "origin", + [ + "start", + "start_day", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + "epoch", + pd.Timestamp("2024-01-01"), + pd.Timestamp("2025-01-01 15:00:00"), + pd.Timestamp("2025-01-03 15:00:00"), + pd.Timestamp("2025-01-01 10:00:33"), + pd.Timestamp("2025-01-02 12:00:13") + ] + ) + def test_origin(self, lmdb_version_store_v1, closed, origin): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + # Start and end are picked so that #bins * rule + start != end on purpose to test + # the bin generation in case of end and end_day + start = pd.Timestamp("2025-01-01 10:00:33") + end = pd.Timestamp("2025-01-02 12:00:20") + idx = pd.date_range(start, end, freq='10s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin + ) + + @pytest.mark.parametrize("origin", [ + "start", + "start_day", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")) + ]) + @pytest.mark.parametrize("date_range", [ + (pd.Timestamp("2025-01-01 10:00:00"), pd.Timestamp("2025-01-02 12:00:00")), # start and end are multiples of rule + (pd.Timestamp("2025-01-01 10:00:00"), pd.Timestamp("2025-01-02 12:00:03")), # start is multiple of rule + (pd.Timestamp("2025-01-01 10:00:03"), pd.Timestamp("2025-01-02 12:00:00")) # end is multiple of rule + ]) + def test_origin_is_multiple_of_freq(self, lmdb_version_store_v1, closed, origin, date_range): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + start, end = date_range + idx = pd.date_range(start, end, freq='10s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin, + drop_empty_buckets_for="col" + ) + + @pytest.mark.parametrize("origin", [ + "start", + "start_day", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + "epoch" + ]) + def test_pre_epoch_data(self, lmdb_version_store_v1, closed, origin): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + start = pd.Timestamp("1800-01-01 10:00:00") + end = pd.Timestamp("1800-01-02 10:00:00") + idx = pd.date_range(start, end, freq='30s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin, + drop_empty_buckets_for="col" + ) + + @pytest.mark.parametrize("origin", [ + "start", + "start_day", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + ]) + @pytest.mark.parametrize("date_range", + list(itertools.product( + [pd.Timestamp("2024-01-01") - pd.Timedelta(1), pd.Timestamp("2024-01-01") + pd.Timedelta(1)], + [pd.Timestamp("2024-01-02") - pd.Timedelta(1), pd.Timestamp("2024-01-02") + pd.Timedelta(1)])) + ) + def test_origin_off_by_one_on_boundary(self, lmdb_version_store_v1, closed, origin, date_range): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + start, end = date_range + idx = pd.date_range(start, end, freq='10s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin, + drop_empty_buckets_for="col" + ) + + @pytest.mark.parametrize("origin", [ + "start_day", + "start", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")) + ]) + def test_non_epoch_origin_throws_with_daterange(self, lmdb_version_store_v1, origin, closed): + lib = lmdb_version_store_v1 + sym = "test_origin_start_throws_with_daterange" + + lib.write(sym, pd.DataFrame({"col": [1, 2, 3]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-01"), pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03")]))) + q = QueryBuilder() + q = q.resample('1min', origin=origin, closed=closed).agg({"col_min":("col", "min")}) + with pytest.raises(UserInputException) as exception_info: + lib.read(sym, query_builder=q, date_range=(pd.Timestamp("2024-01-02"), pd.Timestamp("2024-01-03"))) + assert all(w in str(exception_info.value) for w in [origin, "origin"]) + + @pytest.mark.parametrize("origin", ["epoch", pd.Timestamp("2025-01-03 12:00:00")]) + def test_epoch_and_ts_origin_works_with_date_range(self, lmdb_version_store_v1, closed, origin): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + # Start and end are picked so that #bins * rule + start != end on purpose to test + # the bin generation in case of end and end_day + start = pd.Timestamp("2025-01-01 00:00:00") + end = pd.Timestamp("2025-01-04 00:00:00") + idx = pd.date_range(start, end, freq='3s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": rng.integers(0, 100, len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin, + date_range=(pd.Timestamp("2025-01-02 00:00:00"), pd.Timestamp("2025-01-03 00:00:00")) + ) + +@pytest.mark.skipif(PANDAS_VERSION < Version("1.1.0"), reason="Pandas < 1.1.0 do not have offset param") +@pytest.mark.parametrize("closed", ["left", "right"]) +@pytest.mark.parametrize("label", ["left", "right"]) +@pytest.mark.parametrize("origin",[ + "start", + "start_day", + pytest.param("end", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + pytest.param("end_day", marks=pytest.mark.skipif(PANDAS_VERSION < Version("1.3.0"), reason="Not supported")), + "epoch", + pd.Timestamp("2024-01-01"), + pd.Timestamp("2025-01-01 15:00:00"), + pd.Timestamp("2025-01-03 15:00:00") +]) +@pytest.mark.parametrize("offset", ['10s', '13s', '2min']) +def test_origin_offset_combined(lmdb_version_store_v1, closed, origin, label, offset): + lib = lmdb_version_store_v1 + sym = "test_origin_special_values" + # Start and end are picked so that #bins * rule + start != end on purpose to test + # the bin generation in case of end and end_day + start = pd.Timestamp("2025-01-01 10:00:33") + end = pd.Timestamp("2025-01-02 12:00:20") + idx = pd.date_range(start, end, freq='10s') + rng = np.random.default_rng() + df = pd.DataFrame({"col": range(len(idx))}, index=idx) + lib.write(sym, df) + generic_resample_test( + lib, + sym, + "2min", + all_aggregations_dict("col"), + closed=closed, + origin=origin, + drop_empty_buckets_for="col", + label=label, + offset=offset + ) + +def test_max_with_one_infinity_element(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_max_with_one_infinity_element" + + lib.write(sym, pd.DataFrame({"col": [np.inf]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-01")]))) + q = QueryBuilder() + q = q.resample('1min').agg({"col_max":("col", "max")}) + assert np.isinf(lib.read(sym, query_builder=q).data['col_max'][0]) + +def test_min_with_one_infinity_element(lmdb_version_store_v1): + lib = lmdb_version_store_v1 + sym = "test_min_with_one_infinity_element" + + lib.write(sym, pd.DataFrame({"col": [-np.inf]}, index=pd.DatetimeIndex([pd.Timestamp("2024-01-01")]))) + q = QueryBuilder() + q = q.resample('1min').agg({"col_min":("col", "min")}) + assert np.isneginf(lib.read(sym, query_builder=q).data['col_min'][0]) \ No newline at end of file