Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: upadate datasketches lib from 5.0.2 to 5.2.0 #746

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions 3rd/datasketches/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ target_sources(common
${CMAKE_CURRENT_SOURCE_DIR}/include/quantiles_sorted_view_impl.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/optional.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/version.hpp.in
${CMAKE_CURRENT_SOURCE_DIR}/include/xxhash64.h
)
11 changes: 5 additions & 6 deletions 3rd/datasketches/common/include/MurmurHash3.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ typedef struct {
// Block read - if your platform needs to do endian-swapping or can only
// handle aligned reads, do the conversion here

MURMUR3_FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, size_t i )
MURMUR3_FORCE_INLINE uint64_t getblock64 ( const uint8_t * p, size_t i )
{
uint64_t res;
memcpy(&res, p + i, sizeof(res));
memcpy(&res, p + i * sizeof(uint64_t), sizeof(res));
return res;
}

Expand Down Expand Up @@ -104,13 +104,12 @@ MURMUR3_FORCE_INLINE void MurmurHash3_x64_128(const void* key, size_t lenBytes,

// Number of full 128-bit blocks of 16 bytes.
// Possible exclusion of a remainder of up to 15 bytes.
const size_t nblocks = lenBytes >> 4; // bytes / 16
const size_t nblocks = lenBytes >> 4; // bytes / 16

// Process the 128-bit blocks (the body) into the hash
const uint64_t* blocks = (const uint64_t*)(data);
for (size_t i = 0; i < nblocks; ++i) { // 16 bytes per block
uint64_t k1 = getblock64(blocks, i * 2 + 0);
uint64_t k2 = getblock64(blocks, i * 2 + 1);
uint64_t k1 = getblock64(data, i * 2 + 0);
uint64_t k2 = getblock64(data, i * 2 + 1);

k1 *= c1; k1 = MURMUR3_ROTL64(k1,31); k1 *= c2; out.h1 ^= k1;
out.h1 = MURMUR3_ROTL64(out.h1,27);
Expand Down
18 changes: 18 additions & 0 deletions 3rd/datasketches/common/include/common_defs.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace random_utils {
static std::random_device rd; // possibly unsafe in MinGW with GCC < 9.2
static thread_local std::mt19937_64 rand(rd());
static thread_local std::uniform_real_distribution<> next_double(0.0, 1.0);
static thread_local std::uniform_int_distribution<uint64_t> next_uint64(0, UINT64_MAX);

// thread-safe random bit
static thread_local std::independent_bits_engine<std::mt19937, 1, uint32_t>
Expand Down Expand Up @@ -91,6 +92,23 @@ static inline void write(std::ostream& os, const T* ptr, size_t size_bytes) {
os.write(reinterpret_cast<const char*>(ptr), size_bytes);
}

template<typename T>
T byteswap(T value) {
char* ptr = static_cast<char*>(static_cast<void*>(&value));
const int len = sizeof(T);
for (size_t i = 0; i < len / 2; ++i) {
std::swap(ptr[i], ptr[len - i - 1]);
}
return value;
}

template<typename T>
static inline T read_big_endian(std::istream& is) {
T value;
is.read(reinterpret_cast<char*>(&value), sizeof(T));
return byteswap(value);
}

// wrapper for iterators to implement operator-> returning temporary value
template<typename T>
class return_value_holder {
Expand Down
12 changes: 5 additions & 7 deletions 3rd/datasketches/common/include/quantiles_sorted_view_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,17 @@ auto quantiles_sorted_view<T, C, A>::get_quantile(double rank, bool inclusive) c
template<typename T, typename C, typename A>
auto quantiles_sorted_view<T, C, A>::get_CDF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
if (entries_.empty()) throw std::runtime_error("operation is undefined for an empty sketch");
vector_double buckets(entries_.get_allocator());
if (entries_.size() == 0) return buckets;
check_split_points(split_points, size);
buckets.reserve(size + 1);
for (uint32_t i = 0; i < size; ++i) buckets.push_back(get_rank(split_points[i], inclusive));
buckets.push_back(1);
return buckets;
vector_double ranks(entries_.get_allocator());
ranks.reserve(size + 1);
for (uint32_t i = 0; i < size; ++i) ranks.push_back(get_rank(split_points[i], inclusive));
ranks.push_back(1);
return ranks;
}

template<typename T, typename C, typename A>
auto quantiles_sorted_view<T, C, A>::get_PMF(const T* split_points, uint32_t size, bool inclusive) const -> vector_double {
auto buckets = get_CDF(split_points, size, inclusive);
if (buckets.size() == 0) return buckets;
for (uint32_t i = size; i > 0; --i) {
buckets[i] -= buckets[i - 1];
}
Expand Down
202 changes: 202 additions & 0 deletions 3rd/datasketches/common/include/xxhash64.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// //////////////////////////////////////////////////////////
// xxhash64.h
// Copyright (c) 2016 Stephan Brumme. All rights reserved.
// see http://create.stephan-brumme.com/disclaimer.html
//

#pragma once
#include <stdint.h> // for uint32_t and uint64_t

/// XXHash (64 bit), based on Yann Collet's descriptions, see http://cyan4973.github.io/xxHash/
/** How to use:
uint64_t myseed = 0;
XXHash64 myhash(myseed);
myhash.add(pointerToSomeBytes, numberOfBytes);
myhash.add(pointerToSomeMoreBytes, numberOfMoreBytes); // call add() as often as you like to ...
// and compute hash:
uint64_t result = myhash.hash();

// or all of the above in one single line:
uint64_t result2 = XXHash64::hash(mypointer, numBytes, myseed);

Note: my code is NOT endian-aware !
**/
class XXHash64
{
public:
/// create new XXHash (64 bit)
/** @param seed your seed value, even zero is a valid seed **/
explicit XXHash64(uint64_t seed)
{
state[0] = seed + Prime1 + Prime2;
state[1] = seed + Prime2;
state[2] = seed;
state[3] = seed - Prime1;
bufferSize = 0;
totalLength = 0;
}

/// add a chunk of bytes
/** @param input pointer to a continuous block of data
@param length number of bytes
@return false if parameters are invalid / zero **/
bool add(const void* input, uint64_t length)
{
// no data ?
if (!input || length == 0)
return false;

totalLength += length;
// byte-wise access
const unsigned char* data = (const unsigned char*)input;

// unprocessed old data plus new data still fit in temporary buffer ?
if (bufferSize + length < MaxBufferSize)
{
// just add new data
while (length-- > 0)
buffer[bufferSize++] = *data++;
return true;
}

// point beyond last byte
const unsigned char* stop = data + length;
const unsigned char* stopBlock = stop - MaxBufferSize;

// some data left from previous update ?
if (bufferSize > 0)
{
// make sure temporary buffer is full (16 bytes)
while (bufferSize < MaxBufferSize)
buffer[bufferSize++] = *data++;

// process these 32 bytes (4x8)
process(buffer, state[0], state[1], state[2], state[3]);
}

// copying state to local variables helps optimizer A LOT
uint64_t s0 = state[0], s1 = state[1], s2 = state[2], s3 = state[3];
// 32 bytes at once
while (data <= stopBlock)
{
// local variables s0..s3 instead of state[0]..state[3] are much faster
process(data, s0, s1, s2, s3);
data += 32;
}
// copy back
state[0] = s0; state[1] = s1; state[2] = s2; state[3] = s3;

// copy remainder to temporary buffer
bufferSize = stop - data;
for (uint64_t i = 0; i < bufferSize; i++)
buffer[i] = data[i];

// done
return true;
}

/// get current hash
/** @return 64 bit XXHash **/
uint64_t hash() const
{
// fold 256 bit state into one single 64 bit value
uint64_t result;
if (totalLength >= MaxBufferSize)
{
result = rotateLeft(state[0], 1) +
rotateLeft(state[1], 7) +
rotateLeft(state[2], 12) +
rotateLeft(state[3], 18);
result = (result ^ processSingle(0, state[0])) * Prime1 + Prime4;
result = (result ^ processSingle(0, state[1])) * Prime1 + Prime4;
result = (result ^ processSingle(0, state[2])) * Prime1 + Prime4;
result = (result ^ processSingle(0, state[3])) * Prime1 + Prime4;
}
else
{
// internal state wasn't set in add(), therefore original seed is still stored in state2
result = state[2] + Prime5;
}

result += totalLength;

// process remaining bytes in temporary buffer
const unsigned char* data = buffer;
// point beyond last byte
const unsigned char* stop = data + bufferSize;

// at least 8 bytes left ? => eat 8 bytes per step
for (; data + 8 <= stop; data += 8)
result = rotateLeft(result ^ processSingle(0, *(uint64_t*)data), 27) * Prime1 + Prime4;

// 4 bytes left ? => eat those
if (data + 4 <= stop)
{
result = rotateLeft(result ^ (*(uint32_t*)data) * Prime1, 23) * Prime2 + Prime3;
data += 4;
}

// take care of remaining 0..3 bytes, eat 1 byte per step
while (data != stop)
result = rotateLeft(result ^ (*data++) * Prime5, 11) * Prime1;

// mix bits
result ^= result >> 33;
result *= Prime2;
result ^= result >> 29;
result *= Prime3;
result ^= result >> 32;
return result;
}


/// combine constructor, add() and hash() in one static function (C style)
/** @param input pointer to a continuous block of data
@param length number of bytes
@param seed your seed value, e.g. zero is a valid seed
@return 64 bit XXHash **/
static uint64_t hash(const void* input, uint64_t length, uint64_t seed)
{
XXHash64 hasher(seed);
hasher.add(input, length);
return hasher.hash();
}

private:
/// magic constants :-)
static const uint64_t Prime1 = 11400714785074694791ULL;
static const uint64_t Prime2 = 14029467366897019727ULL;
static const uint64_t Prime3 = 1609587929392839161ULL;
static const uint64_t Prime4 = 9650029242287828579ULL;
static const uint64_t Prime5 = 2870177450012600261ULL;

/// temporarily store up to 31 bytes between multiple add() calls
static const uint64_t MaxBufferSize = 31+1;

uint64_t state[4];
unsigned char buffer[MaxBufferSize];
uint64_t bufferSize;
uint64_t totalLength;

/// rotate bits, should compile to a single CPU instruction (ROL)
static inline uint64_t rotateLeft(uint64_t x, unsigned char bits)
{
return (x << bits) | (x >> (64 - bits));
}

/// process a single 64 bit value
static inline uint64_t processSingle(uint64_t previous, uint64_t input)
{
return rotateLeft(previous + input * Prime2, 31) * Prime1;
}

/// process a block of 4x4 bytes, this is the main part of the XXHash32 algorithm
static inline void process(const void* data, uint64_t& state0, uint64_t& state1, uint64_t& state2, uint64_t& state3)
{
const uint64_t* block = (const uint64_t*) data;
state0 = processSingle(state0, block[0]);
state1 = processSingle(state1, block[1]);
state2 = processSingle(state2, block[2]);
state3 = processSingle(state3, block[3]);
}
};
8 changes: 7 additions & 1 deletion 3rd/datasketches/cpc/include/cpc_compressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ template<typename A> class cpc_compressor;
template<typename A>
inline cpc_compressor<A>& get_compressor();

// function called atexit to clean up compression tables
template<typename A>
void destroy_compressor();

template<typename A>
class cpc_compressor {
public:
Expand Down Expand Up @@ -109,8 +113,10 @@ class cpc_compressor {
};

cpc_compressor();
template<typename T> friend cpc_compressor<T>& get_compressor();
friend cpc_compressor& get_compressor<A>();

~cpc_compressor();
friend void destroy_compressor<A>();

void make_decoding_tables(); // call this at startup
void free_decoding_tables(); // call this at the end
Expand Down
10 changes: 10 additions & 0 deletions 3rd/datasketches/cpc/include/cpc_compressor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
#ifndef CPC_COMPRESSOR_IMPL_HPP_
#define CPC_COMPRESSOR_IMPL_HPP_

#include <cstdlib>
#include <memory>
#include <stdexcept>

#include "common_defs.hpp"
#include "compression_data.hpp"
#include "cpc_util.hpp"
#include "cpc_common.hpp"
Expand All @@ -36,9 +38,17 @@ namespace datasketches {
template<typename A>
cpc_compressor<A>& get_compressor() {
static cpc_compressor<A>* instance = new cpc_compressor<A>(); // use new for global initialization
static int reg_result = std::atexit(destroy_compressor<A>); // just to clean up a little more nicely; don't worry if it fails
unused(reg_result);
return *instance;
}

// register to call compressor destructor at exit
template<typename A>
void destroy_compressor() {
delete std::addressof(get_compressor<A>());
}

template<typename A>
cpc_compressor<A>::cpc_compressor() {
make_decoding_tables();
Expand Down
Loading