Skip to content

Commit

Permalink
Rename mc::range to mc::block, better bucket scaling
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 20, 2024
1 parent 7214a80 commit 689f6b4
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 81 deletions.
6 changes: 3 additions & 3 deletions src/gribjump/GribJumpDataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ namespace gribjump {
class GribJumpDataAccessor : public mc::DataAccessor {

public:
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Range range) : dh_{dh}, data_section_range_{range} {}
GribJumpDataAccessor(eckit::DataHandle& dh, const mc::Block range) : dh_{dh}, data_section_range_{range} {}

void write(const eckit::Buffer& buffer, const size_t offset) const override {
NOTIMP;
}

eckit::Buffer read(const mc::Range& range) const override {
eckit::Buffer read(const mc::Block& range) const override {
eckit::Offset offset = range.first;
eckit::Length size = range.second;

Expand All @@ -51,7 +51,7 @@ class GribJumpDataAccessor : public mc::DataAccessor {

private:
eckit::DataHandle& dh_;
mc::Range data_section_range_;
mc::Block data_section_range_;
};

} // namespace gribjump
6 changes: 3 additions & 3 deletions src/gribjump/compression/DataAccessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class DataAccessor {
public:
virtual ~DataAccessor() = default;
virtual void write(const eckit::Buffer& buffer, const size_t offset) const = 0;
virtual eckit::Buffer read(const Range& range) const = 0;
virtual eckit::Buffer read(const Block& range) const = 0;
virtual eckit::Buffer read() const = 0;
virtual size_t eof() const = 0;
};
Expand All @@ -43,7 +43,7 @@ class PosixAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
eckit::Buffer buf(size);
ifs_.seekg(offset, std::ios::beg);
Expand Down Expand Up @@ -89,7 +89,7 @@ class MemoryAccessor : public DataAccessor {
NOTIMP;
}

eckit::Buffer read(const Range& range) const override {
eckit::Buffer read(const Block& range) const override {
const auto [offset, size] = range;
if (offset + size > buf_.size()) {
std::stringstream ss;
Expand Down
10 changes: 5 additions & 5 deletions src/gribjump/compression/NumericCompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,23 @@ class NumericDecompressor {
using CompressedData = eckit::Buffer;
using Values = std::vector<ValueType>;
virtual Values decode(const CompressedData&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Range&) = 0;
virtual Values decode(const std::shared_ptr<DataAccessor>, const Block&) = 0;


virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges) {
virtual std::vector<Values> decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges) {
using Values = typename NumericDecompressor<ValueType>::Values;
std::vector<Values> result;
decode(accessor, ranges, result);
return result;
}

virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Range>& ranges, std::vector<Values>& result) {
virtual void decode(const std::shared_ptr<DataAccessor>& accessor, const std::vector<mc::Block>& ranges, std::vector<Values>& result) {
using Values = typename NumericDecompressor<ValueType>::Values;

std::unordered_map<Range, std::pair<Range, std::shared_ptr<Values>>> ranges_map;
std::unordered_map<Block, std::pair<Block, std::shared_ptr<Values>>> ranges_map;

// find which sub_ranges are in which buckets
RangeBuckets buckets;
BlockBuckets buckets;
for (const auto& range : ranges) {
buckets << range;
}
Expand Down
94 changes: 51 additions & 43 deletions src/gribjump/compression/Range.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,31 +12,31 @@
#include <algorithm>
#include <cassert>

std::pair<size_t, size_t> begin_end(const mc::Range& range)
std::pair<size_t, size_t> begin_end(const mc::Block& range)
{
const auto [offset, size] = range;
return {offset, offset + size};
}


mc::Range operator+(const mc::Range& r1, const mc::Range& r2)
mc::Block operator+(const mc::Block& r1, const mc::Block& r2)
{
auto [b1, e1] = begin_end(r1);
auto [b2, e2] = begin_end(r2);
assert(!((b1 > e2) && (b2 > e1)));
return mc::Range{std::min(b1, b2), std::max(e1, e2) - std::min(b1, b2)};
return mc::Block{std::min(b1, b2), std::max(e1, e2) - std::min(b1, b2)};
}


std::ostream& operator<<(std::ostream& os, const mc::Range& range)
std::ostream& operator<<(std::ostream& os, const mc::Block& range)
{
auto [rb, re] = begin_end(range);
os << "[" << rb << ", " << re << "]";
return os;
}


std::ostream& operator<<(std::ostream& os, const mc::RangeBucket& range)
std::ostream& operator<<(std::ostream& os, const mc::BlockBucket& range)
{
os << range.first << std::endl;
for (const auto& r : range.second) {
Expand All @@ -47,50 +47,58 @@ std::ostream& operator<<(std::ostream& os, const mc::RangeBucket& range)
}


mc::RangeBuckets& operator<<(mc::RangeBuckets& buckets, const mc::Range& r)
{
const mc::Range sub_range{r};
const auto [srb_tmp, sre_tmp] = begin_end(sub_range);
auto srb = srb_tmp; // not necessary in C++20
auto sre = sre_tmp; // not necessary in C++20

auto r1 = std::find_if(buckets.begin(), buckets.end(), [&](const auto bucket) {
const auto [bucket_range, _] = bucket;
const auto [brb, bre] = begin_end(bucket_range);
return brb <= srb && srb <= bre;
});

if (r1 != buckets.end()) {
r1->first = sub_range + r1->first;
r1->second.push_back(sub_range);
if (std::next(r1) != buckets.end()) {
auto r2 = std::next(r1);
auto [r2_begin, r2_end] = begin_end(r2->first);

if (r2_begin <= sre && sre <= r2_end) {
r1->first = r1->first + r2->first;
std::copy(r2->second.begin(), r2->second.end(), std::back_inserter(r1->second));
buckets.erase(r2);
}
}
return buckets;
}
auto r2 = std::find_if(buckets.begin(), buckets.end(), [&](auto l) {
auto [l_begin, l_end] = begin_end(l.first);
return l_begin <= sre && sre <= l_end;
});
if (r2 != buckets.end()) {
r2->first = sub_range + r1->first;
return buckets;
mc::BlockBuckets& operator<<(mc::BlockBuckets& buckets, const mc::Block& r) {

const mc::Block sub_range{r};
const auto [sub_start, sub_end] = begin_end(sub_range);

// Find the position where the range might be inserted
auto it = std::lower_bound(buckets.begin(), buckets.end(), sub_range,
[](const mc::BlockBucket& bucket, const mc::Block& range) {
const auto [bucket_start, bucket_end] = begin_end(bucket.first);
return bucket_end < range.first;
});

mc::Block merged_range = sub_range;
mc::SubBlocks merged_subranges = {sub_range};

// Merge with any overlapping buckets before the insertion point
while (it != buckets.begin()) {
auto prev_it = std::prev(it);
const auto [prev_start, prev_end] = begin_end(prev_it->first);

if (prev_end < sub_start) break; // No overlap

// Expand the merged range
merged_range.first = std::min(merged_range.first, prev_start);
merged_range.second = (std::max(prev_end, sub_end) - merged_range.first);

merged_subranges.insert(merged_subranges.end(), prev_it->second.begin(), prev_it->second.end());

it = buckets.erase(prev_it);
}

buckets.push_back({sub_range, mc::SubRanges{sub_range}});
// Merge with any overlapping buckets after the insertion point
while (it != buckets.end()) {
const auto [next_start, next_end] = begin_end(it->first);

if (next_start > sub_end) break; // No overlap

// Expand the merged range
merged_range.first = std::min(merged_range.first, next_start);
merged_range.second = (std::max(next_end, sub_end) - merged_range.first);

merged_subranges.insert(merged_subranges.end(), it->second.begin(), it->second.end());

// Erase the current bucket and move the iterator forward
it = buckets.erase(it);
}

buckets.insert(it, {merged_range, merged_subranges});
return buckets;
}


std::size_t std::hash<mc::Range>::operator() (const mc::Range& range) const
std::size_t std::hash<mc::Block>::operator() (const mc::Block& range) const
{
static_assert(sizeof(std::size_t) == sizeof(std::uint64_t), "std::size_t must be 64 bits");
const auto [offset, size] = range;
Expand Down
28 changes: 13 additions & 15 deletions src/gribjump/compression/Range.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@
#include <iostream>
#include <tuple>
#include <cstdint>

namespace mc {
using Range = std::pair<size_t, size_t>;
using Block = std::pair<size_t, size_t>;
}

template<>
struct std::hash<mc::Range> {
std::size_t operator()(const mc::Range& range) const;
struct std::hash<mc::Block> {
std::size_t operator()(const mc::Block& range) const;
};


namespace mc {

// A bucket is a continous range of data that can be decoded in one go
std::pair<size_t, size_t> get_begin_end(const Range& range);
using SubRange = Range;
using SubRanges = std::vector<SubRange>;
using RangeBucket = std::pair<Range, SubRanges>;
using RangeBuckets = std::list<RangeBucket>;

std::pair<size_t, size_t> get_begin_end(const Block& range);
using SubBlock = Block;
using SubBlocks = std::vector<SubBlock>;
using BlockBucket = std::pair<Block, SubBlocks>;
using BlockBuckets = std::vector<BlockBucket>; // Sorted to allow binary search (std::lower_bound)
} // namespace mc

std::ostream& operator<<(std::ostream& os, const mc::Range& range);
std::ostream& operator<<(std::ostream& os, const mc::RangeBucket& bucket);
mc::RangeBuckets& operator<<(mc::RangeBuckets& buckets, const mc::Range& r);
mc::Range operator+(const mc::Range& r1, const mc::Range& r2);
std::ostream& operator<<(std::ostream& os, const mc::Block& range);
std::ostream& operator<<(std::ostream& os, const mc::BlockBucket& bucket);
mc::BlockBuckets& operator<<(mc::BlockBuckets& buckets, const mc::Block& r);
mc::Block operator+(const mc::Block& r1, const mc::Block& r2);

std::pair<size_t, size_t> begin_end(const mc::Range& range);
std::pair<size_t, size_t> begin_end(const mc::Block& range);
2 changes: 1 addition & 1 deletion src/gribjump/compression/compressors/Aec.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ class AecDecompressor : public NumericDecompressor<ValueType>, public AecParams
}


Values decode(const std::shared_ptr<DataAccessor> accessor, const Range& range) override
Values decode(const std::shared_ptr<DataAccessor> accessor, const Block& range) override
{
if (sizeof(ValueType) == 1 && !(bits_per_sample_ > 0 && bits_per_sample_ <= 8))
throw eckit::Exception("bits_per_sample must be between 1 and 8 for 1-byte types", Here());
Expand Down
4 changes: 2 additions & 2 deletions src/gribjump/compression/compressors/Ccsds.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ class CcsdsDecompressor : public mc::NumericDecompressor<ValueType>, public Ccsd
}


Values decode(const std::shared_ptr<DataAccessor> accessor, const Range& range) override {
Values decode(const std::shared_ptr<DataAccessor> accessor, const Block& range) override {
if (range.second == 0)
return Values{};

Expand Down Expand Up @@ -245,7 +245,7 @@ class CcsdsDecompressor : public mc::NumericDecompressor<ValueType>, public Ccsd
size_t n_elems_;

template <typename SimpleValueType>
Values decode_range_ (const std::shared_ptr<DataAccessor> accessor, const Range& simple_range, double bscale, double dscale) {
Values decode_range_ (const std::shared_ptr<DataAccessor> accessor, const Block& simple_range, double bscale, double dscale) {
AecDecompressor<SimpleValueType> aec{};
auto flags = modify_aec_flags(flags_);
aec.flags(flags);
Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/compression/compressors/Simple.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class SimpleDecompressor : public NumericDecompressor<ValueType>, public SimpleP
size_t buffer_size() const { return buffer_size_; }
SimpleDecompressor& buffer_size(size_t buffer_size) { buffer_size_ = buffer_size; return *this; }

Values decode(const std::shared_ptr<DataAccessor> accessor, const Range& range) override {
Values decode(const std::shared_ptr<DataAccessor> accessor, const Block& range) override {
//SP sp{};
using SP = SimplePacking<ValueType>;
SimplePacking<double> sp{};
Expand All @@ -121,12 +121,12 @@ class SimpleDecompressor : public NumericDecompressor<ValueType>, public SimpleP
size_t end = offset + size;
size_t new_end = (end + (chunk_nvals - 1)) / chunk_nvals * chunk_nvals;
size_t new_size = new_end - new_offset;
Range inclusive_range{new_offset, new_size};
Block inclusive_range{new_offset, new_size};

params.n_vals = new_size;

size_t last_pos = std::min(bin_pos<ValueType>(new_size, bits_per_value_), accessor->eof() - bin_pos<ValueType>(new_offset, bits_per_value_));
Range data_range{bin_pos<ValueType>(new_offset, bits_per_value_), last_pos};
Block data_range{bin_pos<ValueType>(new_offset, bits_per_value_), last_pos};
eckit::Buffer compressed = accessor->read(data_range);

typename SP::Buffer data{(unsigned char*) compressed.data(), (unsigned char*) compressed.data() + data_range.second};
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/jumper/CcsdsJumper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ void CcsdsJumper::readValues(eckit::DataHandle& dh, const eckit::Offset offset,
.offsets(info.ccsdsOffsets());


auto data_range = mc::Range{offset + info.offsetBeforeData(), info.offsetAfterData() - info.offsetBeforeData()};
auto data_range = mc::Block{offset + info.offsetBeforeData(), info.offsetAfterData() - info.offsetBeforeData()};
std::shared_ptr<mc::DataAccessor> data_accessor = std::make_shared<GribJumpDataAccessor>(dh, data_range);

// TODO(maee): Optimize this
Expand Down
6 changes: 3 additions & 3 deletions src/gribjump/jumper/Jumper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ std::vector<std::bitset<64>> toBitset(const Bitmap& bitmap) {

// Convert ranges to intervals
// TODO(maee): Simplification: Switch to intervals or ranges
std::vector<mc::Range> toRanges(const std::vector<Interval>& intervals) {
std::vector<mc::Range> ranges;
std::vector<mc::Block> toRanges(const std::vector<Interval>& intervals) {
std::vector<mc::Block> ranges;
std::transform(intervals.begin(), intervals.end(), std::back_inserter(ranges), [](auto interval) {
auto [begin, end] = interval;
return mc::Range{begin, end - begin};
return mc::Block{begin, end - begin};
});
return ranges;
}
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/jumper/Jumper.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,6 @@ class BadJumpInfoException : public eckit::Exception {

// Convert ranges to intervals
// TODO(maee): Simplification: Switch to intervals or ranges
std::vector<mc::Range> toRanges(const std::vector<Interval>& intervals);
std::vector<mc::Block> toRanges(const std::vector<Interval>& intervals);

} // namespace gribjump
2 changes: 1 addition & 1 deletion src/gribjump/jumper/SimpleJumper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ void SimpleJumper::readValues(eckit::DataHandle& dh, const eckit::Offset offset,
const SimpleInfo& info = *psimple;


std::shared_ptr<mc::DataAccessor> data_accessor = std::make_shared<GribJumpDataAccessor>(dh, mc::Range{offset + info.offsetBeforeData(), info.offsetAfterData() - info.offsetBeforeData()}); // TODO XXX
std::shared_ptr<mc::DataAccessor> data_accessor = std::make_shared<GribJumpDataAccessor>(dh, mc::Block{offset + info.offsetBeforeData(), info.offsetAfterData() - info.offsetBeforeData()}); // TODO XXX
mc::SimpleDecompressor<double> simple{};
simple
.bits_per_value(info.bitsPerValue())
Expand Down
Loading

0 comments on commit 689f6b4

Please sign in to comment.