From 55fda3f2bfdfb9b41fd6e71162da8c0f437a6248 Mon Sep 17 00:00:00 2001 From: Chris Bradley Date: Thu, 21 Nov 2024 01:31:30 +0000 Subject: [PATCH] Speed up serialisation of ExtractionResult and ExtractionRequest --- src/gribjump/ExtractionData.cc | 157 ++++++++++++++++++--------- src/gribjump/remote/RemoteGribJump.h | 2 +- 2 files changed, 109 insertions(+), 50 deletions(-) diff --git a/src/gribjump/ExtractionData.cc b/src/gribjump/ExtractionData.cc index 2f576f0..01ce997 100644 --- a/src/gribjump/ExtractionData.cc +++ b/src/gribjump/ExtractionData.cc @@ -18,23 +18,117 @@ namespace gribjump { namespace { -void encodeVector(eckit::Stream& s, const std::vector& v) { +template +void encodeVector(eckit::Stream& s, const std::vector& v) { size_t size = v.size(); s << size; - eckit::Buffer buffer(v.data(), size * sizeof(double)); + eckit::Buffer buffer(v.data(), size * sizeof(T)); s << buffer; } -std::vector decodeVector(eckit::Stream& s) { +template +std::vector decodeVector(eckit::Stream& s) { size_t size; s >> size; - eckit::Buffer buffer(size * sizeof(double)); + eckit::Buffer buffer(size * sizeof(T)); s >> buffer; - double* data = (double*) buffer.data(); + T* data = (T*) buffer.data(); - return std::vector(data, data + size); + return std::vector(data, data + size); } +template +void encodeVectorVector(eckit::Stream& s, const std::vector>& vv) { + std::vector sizes; + sizes.reserve(vv.size()); + size_t totalSize = 0; + for (auto& v : vv) { + sizes.push_back(v.size()); + totalSize += v.size(); + } + encodeVector(s, sizes); + + // Make a flat vector + std::vector flat; + flat.reserve(totalSize); + for (auto& v : vv) { + flat.insert(flat.end(), v.begin(), v.end()); + } + encodeVector(s, flat); +} + +template +std::vector> decodeVectorVector(eckit::Stream& s) { + std::vector sizes = decodeVector(s); + std::vector flat = decodeVector(s); + + std::vector> vv; + size_t pos = 0; + for (auto& size : sizes) { + vv.push_back(std::vector(flat.begin() + pos, flat.begin() + pos + size)); + pos += size; + } + return vv; +} + +void encodeRanges(eckit::Stream& s, const std::vector& ranges) { + size_t size = ranges.size(); + s << size; + eckit::Buffer buffer(ranges.data(), size * sizeof(size_t)*2); // does this really work? + s << buffer; +} + +std::vector decodeRanges(eckit::Stream& s) { + size_t size; + s >> size; + eckit::Buffer buffer(size * sizeof(size_t)*2); + s >> buffer; + size_t* data = (size_t*) buffer.data(); + + std::vector ranges; + for (size_t i = 0; i < size; i++) { + ranges.push_back(std::make_pair(data[i*2], data[i*2+1])); + } + + return ranges; +} + +void encodeMask(eckit::Stream& s, const std::vector>>& mask) { + + size_t totalSize = 0; + std::vector sizes; + for (auto& v : mask) { + totalSize += v.size(); + sizes.push_back(v.size()); + } + std::vector flat; + flat.reserve(totalSize); + for (auto& v : mask) { + for (auto& b : v) { + flat.push_back(b.to_ullong()); + } + } + encodeVector(s, sizes); + encodeVector(s, flat); +} + +std::vector>> decodeMask(eckit::Stream& s) { + + std::vector sizes = decodeVector(s); + std::vector flat = decodeVector(s); + + std::vector>> mask; + size_t pos = 0; + for (auto& size : sizes) { + std::vector> maskBitset; + for (size_t i = 0; i < size; i++) { + maskBitset.push_back(std::bitset<64>(flat[pos + i])); + } + mask.push_back(maskBitset); + pos += size; + } + return mask; +} } // namespace ExtractionResult::ExtractionResult() {} @@ -45,21 +139,13 @@ ExtractionResult::ExtractionResult(std::vector> values, std: {} ExtractionResult::ExtractionResult(eckit::Stream& s) { - size_t numRanges; - s >> numRanges; - for (size_t i = 0; i < numRanges; i++) { - values_.push_back(decodeVector(s)); - } + values_ = decodeVectorVector(s); + mask_ = decodeMask(s); +} - std::vector> bitsetStrings; - s >> bitsetStrings; - for (auto& v : bitsetStrings) { - std::vector> bitset; - for (auto& b : v) { - bitset.push_back(std::bitset<64>(b)); - } - mask_.push_back(bitset); - } +void ExtractionResult::encode(eckit::Stream& s) const { + encodeVectorVector(s, values_); + encodeMask(s, mask_); } void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsigned long** nvalues) { @@ -72,24 +158,6 @@ void ExtractionResult::values_ptr(double*** values, unsigned long* nrange, unsig } } -void ExtractionResult::encode(eckit::Stream& s) const { - - s << values_.size(); // vector of vectors - for (auto& v : values_) { - encodeVector(s, v); - } - - std::vector> bitsetStrings; - for (auto& v : mask_) { - std::vector bitsetString; - for (auto& b : v) { - bitsetString.push_back(b.to_string()); - } - bitsetStrings.push_back(bitsetString); - } - s << bitsetStrings; -} - void ExtractionResult::print(std::ostream& s) const { s << "ExtractionResult[Values:["; for (auto& v : values_) { @@ -129,13 +197,7 @@ ExtractionRequest::ExtractionRequest() {} ExtractionRequest::ExtractionRequest(eckit::Stream& s) { s >> request_; s >> gridHash_; - size_t numRanges; - s >> numRanges; - for (size_t j = 0; j < numRanges; j++) { - size_t start, end; - s >> start >> end; - ranges_.push_back(std::make_pair(start, end)); - } + ranges_ = decodeRanges(s); } eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) { @@ -146,10 +208,7 @@ eckit::Stream& operator<<(eckit::Stream& s, const ExtractionRequest& o) { void ExtractionRequest::encode(eckit::Stream& s) const { s << request_; s << gridHash_; - s << ranges_.size(); - for (auto& [start, end] : ranges_) { - s << start << end; - } + encodeRanges(s, ranges_); } void ExtractionRequest::print(std::ostream& s) const { diff --git a/src/gribjump/remote/RemoteGribJump.h b/src/gribjump/remote/RemoteGribJump.h index eb6e54c..ce0ac04 100644 --- a/src/gribjump/remote/RemoteGribJump.h +++ b/src/gribjump/remote/RemoteGribJump.h @@ -24,7 +24,7 @@ enum class RequestType : uint16_t { SCAN, FORWARD_EXTRACT }; -constexpr static uint16_t remoteProtocolVersion = 1; +constexpr static uint16_t remoteProtocolVersion = 2; class RemoteGribJump : public GribJumpBase {