Skip to content

Commit

Permalink
Speed up serialisation of ExtractionResult and ExtractionRequest
Browse files Browse the repository at this point in the history
  • Loading branch information
ChrisspyB committed Nov 21, 2024
1 parent 689f6b4 commit 55fda3f
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 50 deletions.
157 changes: 108 additions & 49 deletions src/gribjump/ExtractionData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,117 @@ namespace gribjump {

namespace {

void encodeVector(eckit::Stream& s, const std::vector<double>& v) {
template <typename T>
void encodeVector(eckit::Stream& s, const std::vector<T>& v) {
size_t size = v.size();
s << size;
eckit::Buffer buffer(v.data(), size * sizeof(double));
eckit::Buffer buffer(v.data(), size * sizeof(T));
s << buffer;
}

std::vector<double> decodeVector(eckit::Stream& s) {
template <typename T>
std::vector<T> decodeVector(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(double));
eckit::Buffer buffer(size * sizeof(T));
s >> buffer;
double* data = (double*) buffer.data();
T* data = (T*) buffer.data();

return std::vector<double>(data, data + size);
return std::vector<T>(data, data + size);
}

template <typename T>
void encodeVectorVector(eckit::Stream& s, const std::vector<std::vector<T>>& vv) {
std::vector<size_t> sizes;
sizes.reserve(vv.size());
size_t totalSize = 0;
for (auto& v : vv) {
sizes.push_back(v.size());
totalSize += v.size();
}
encodeVector(s, sizes);

// Make a flat vector
std::vector<T> flat;
flat.reserve(totalSize);
for (auto& v : vv) {
flat.insert(flat.end(), v.begin(), v.end());
}
encodeVector(s, flat);
}

template <typename T>
std::vector<std::vector<T>> decodeVectorVector(eckit::Stream& s) {
std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<T> flat = decodeVector<T>(s);

std::vector<std::vector<T>> vv;
size_t pos = 0;
for (auto& size : sizes) {
vv.push_back(std::vector<T>(flat.begin() + pos, flat.begin() + pos + size));
pos += size;
}
return vv;
}

void encodeRanges(eckit::Stream& s, const std::vector<Range>& ranges) {
size_t size = ranges.size();
s << size;
eckit::Buffer buffer(ranges.data(), size * sizeof(size_t)*2); // does this really work?
s << buffer;
}

std::vector<Range> decodeRanges(eckit::Stream& s) {
size_t size;
s >> size;
eckit::Buffer buffer(size * sizeof(size_t)*2);
s >> buffer;
size_t* data = (size_t*) buffer.data();

std::vector<Range> ranges;
for (size_t i = 0; i < size; i++) {
ranges.push_back(std::make_pair(data[i*2], data[i*2+1]));
}

return ranges;
}

void encodeMask(eckit::Stream& s, const std::vector<std::vector<std::bitset<64>>>& mask) {

size_t totalSize = 0;
std::vector<size_t> sizes;
for (auto& v : mask) {
totalSize += v.size();
sizes.push_back(v.size());
}
std::vector<uint64_t> flat;
flat.reserve(totalSize);
for (auto& v : mask) {
for (auto& b : v) {
flat.push_back(b.to_ullong());
}
}
encodeVector(s, sizes);
encodeVector(s, flat);
}

std::vector<std::vector<std::bitset<64>>> decodeMask(eckit::Stream& s) {

std::vector<size_t> sizes = decodeVector<size_t>(s);
std::vector<uint64_t> flat = decodeVector<uint64_t>(s);

std::vector<std::vector<std::bitset<64>>> mask;
size_t pos = 0;
for (auto& size : sizes) {
std::vector<std::bitset<64>> maskBitset;
for (size_t i = 0; i < size; i++) {
maskBitset.push_back(std::bitset<64>(flat[pos + i]));
}
mask.push_back(maskBitset);
pos += size;
}
return mask;
}
} // namespace

ExtractionResult::ExtractionResult() {}
Expand All @@ -45,21 +139,13 @@ ExtractionResult::ExtractionResult(std::vector<std::vector<double>> values, std:
{}

ExtractionResult::ExtractionResult(eckit::Stream& s) {
size_t numRanges;
s >> numRanges;
for (size_t i = 0; i < numRanges; i++) {
values_.push_back(decodeVector(s));
}
values_ = decodeVectorVector<double>(s);
mask_ = decodeMask(s);
}

std::vector<std::vector<std::string>> bitsetStrings;
s >> bitsetStrings;
for (auto& v : bitsetStrings) {
std::vector<std::bitset<64>> bitset;
for (auto& b : v) {
bitset.push_back(std::bitset<64>(b));
}
mask_.push_back(bitset);
}
void ExtractionResult::encode(eckit::Stream& s) const {
encodeVectorVector(s, values_);
encodeMask(s, mask_);
}

void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsigned long** nvalues) {
Expand All @@ -72,24 +158,6 @@ void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsig
}
}

void ExtractionResult::encode(eckit::Stream& s) const {

s << values_.size(); // vector of vectors
for (auto& v : values_) {
encodeVector(s, v);
}

std::vector<std::vector<std::string>> bitsetStrings;
for (auto& v : mask_) {
std::vector<std::string> bitsetString;
for (auto& b : v) {
bitsetString.push_back(b.to_string());
}
bitsetStrings.push_back(bitsetString);
}
s << bitsetStrings;
}

void ExtractionResult::print(std::ostream& s) const {
s << "ExtractionResult[Values:[";
for (auto& v : values_) {
Expand Down Expand Up @@ -129,13 +197,7 @@ ExtractionRequest::ExtractionRequest() {}
ExtractionRequest::ExtractionRequest(eckit::Stream& s) {
s >> request_;
s >> gridHash_;
size_t numRanges;
s >> numRanges;
for (size_t j = 0; j < numRanges; j++) {
size_t start, end;
s >> start >> end;
ranges_.push_back(std::make_pair(start, end));
}
ranges_ = decodeRanges(s);
}

eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
Expand All @@ -146,10 +208,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) {
void ExtractionRequest::encode(eckit::Stream& s) const {
s << request_;
s << gridHash_;
s << ranges_.size();
for (auto& [start, end] : ranges_) {
s << start << end;
}
encodeRanges(s, ranges_);
}

void ExtractionRequest::print(std::ostream& s) const {
Expand Down
2 changes: 1 addition & 1 deletion src/gribjump/remote/RemoteGribJump.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ enum class RequestType : uint16_t {
SCAN,
FORWARD_EXTRACT
};
constexpr static uint16_t remoteProtocolVersion = 1;
constexpr static uint16_t remoteProtocolVersion = 2;

class RemoteGribJump : public GribJumpBase {

Expand Down

0 comments on commit 55fda3f

Please sign in to comment.