#pragma once
#include
#include
#include
#include
#include
#include
#include
// We start with 128 bins and grow the number of bins by 128
// each time we need to extend the range of the bins.
// This is done to avoid reallocating the bins vector too often.
constexpr UInt32 CHUNK_SIZE = 128;
namespace DB
{
class DDSketchDenseStore
{
public:
Float64 count = 0;
int min_key = std::numeric_limits::max();
int max_key = std::numeric_limits::min();
int offset = 0;
std::vector bins;
explicit DDSketchDenseStore(UInt32 chunk_size_ = CHUNK_SIZE) : chunk_size(chunk_size_) {}
void copy(DDSketchDenseStore* other)
{
bins = other->bins;
count = other->count;
min_key = other->min_key;
max_key = other->max_key;
offset = other->offset;
}
int length() const
{
return static_cast(bins.size());
}
void add(int key, Float64 weight)
{
int idx = getIndex(key);
bins[idx] += weight;
count += weight;
}
int keyAtRank(Float64 rank, bool lower)
{
Float64 running_ct = 0.0;
for (size_t i = 0; i < bins.size(); ++i)
{
running_ct += bins[i];
if ((lower && running_ct > rank) || (!lower && running_ct >= rank + 1))
{
return static_cast(i) + offset;
}
}
return max_key;
}
void merge(DDSketchDenseStore* other)
{
if (other->count == 0) return;
if (count == 0)
{
copy(other);
return;
}
if (other->min_key < min_key || other->max_key > max_key)
{
extendRange(other->min_key, other->max_key);
}
for (int key = other->min_key; key <= other->max_key; ++key)
{
bins[key - offset] += other->bins[key - other->offset];
}
count += other->count;
}
/// NOLINTBEGIN(readability-static-accessed-through-instance)
void serialize(WriteBuffer& buf) const
{
// Calculate the size of the dense and sparse encodings to choose the smallest one
UInt64 num_bins = 0, num_non_empty_bins = 0;
if (count != 0)
{
num_bins = max_key - min_key + 1;
}
size_t sparse_encoding_overhead = 0;
for (int index = min_key; index <= max_key; ++index)
{
if (bins[index - offset] != 0)
{
num_non_empty_bins++;
sparse_encoding_overhead += 2; // 2 bytes for index delta
}
}
size_t dense_encoding_overhead = (num_bins - num_non_empty_bins) * estimatedFloatSize(0.0);
// Choose the smallest encoding and write to buffer
if (dense_encoding_overhead <= sparse_encoding_overhead)
{
// Write the dense encoding
writeBinary(enc.BinEncodingContiguousCounts, buf); // Flag for dense encoding
writeVarUInt(num_bins, buf);
writeVarInt(min_key, buf);
writeVarInt(1, buf); // indexDelta in dense encoding
for (int index = min_key; index <= max_key; ++index)
{
writeFloatBinary(bins[index - offset], buf);
}
}
else
{
// Write the sparse encoding
writeBinary(enc.BinEncodingIndexDeltasAndCounts, buf); // Flag for sparse encoding
writeVarUInt(num_non_empty_bins, buf);
int previous_index = 0;
for (int index = min_key; index <= max_key; ++index)
{
Float64 bin_count = bins[index - offset];
if (bin_count != 0)
{
writeVarInt(index - previous_index, buf);
writeFloatBinary(bin_count, buf);
previous_index = index;
}
}
}
}
void deserialize(ReadBuffer& buf)
{
UInt8 encoding_mode;
readBinary(encoding_mode, buf);
if (encoding_mode == enc.BinEncodingContiguousCounts)
{
UInt64 num_bins;
readVarUInt(num_bins, buf);
int start_key;
readVarInt(start_key, buf);
int index_delta;
readVarInt(index_delta, buf);
for (UInt64 i = 0; i < num_bins; ++i)
{
Float64 bin_count;
readFloatBinary(bin_count, buf);
add(start_key, bin_count);
start_key += index_delta;
}
}
else
{
UInt64 num_non_empty_bins;
readVarUInt(num_non_empty_bins, buf);
int previous_index = 0;
for (UInt64 i = 0; i < num_non_empty_bins; ++i)
{
int index_delta;
readVarInt(index_delta, buf);
Float64 bin_count;
readFloatBinary(bin_count, buf);
previous_index += index_delta;
add(previous_index, bin_count);
}
}
}
/// NOLINTEND(readability-static-accessed-through-instance)
private:
UInt32 chunk_size;
DDSketchEncoding enc;
int getIndex(int key)
{
if (key < min_key || key > max_key)
{
extendRange(key, key);
}
return key - offset;
}
UInt32 getNewLength(int new_min_key, int new_max_key) const
{
int desired_length = new_max_key - new_min_key + 1;
return static_cast(chunk_size * std::ceil(static_cast(desired_length) / chunk_size)); // Fixed float conversion
}
void extendRange(int key, int second_key)
{
int new_min_key = std::min({key, min_key});
int new_max_key = std::max({second_key, max_key});
if (length() == 0)
{
bins = std::vector(getNewLength(new_min_key, new_max_key), 0.0);
offset = new_min_key;
adjust(new_min_key, new_max_key);
}
else if (new_min_key >= offset && new_max_key < offset + length())
{
min_key = new_min_key;
max_key = new_max_key;
}
else
{
UInt32 new_length = getNewLength(new_min_key, new_max_key);
if (new_length > bins.size())
{
bins.resize(new_length);
bins.resize(bins.capacity());
}
adjust(new_min_key, new_max_key);
}
}
void adjust(int new_min_key, int new_max_key)
{
centerBins(new_min_key, new_max_key);
min_key = new_min_key;
max_key = new_max_key;
}
void shiftBins(int shift)
{
int new_offset = offset - shift;
if (new_offset > offset)
std::rotate(bins.begin(), bins.begin() + (new_offset - offset) % bins.size(), bins.end());
else
std::rotate(bins.begin(), bins.end() - (offset - new_offset) % bins.size(), bins.end());
offset = new_offset;
}
void centerBins(int new_min_key, int new_max_key)
{
int margins = length() - (new_max_key - new_min_key + 1);
int new_offset = new_min_key - margins / 2;
shiftBins(offset - new_offset);
}
size_t estimatedFloatSize(Float64 value) const
{
// Assuming IEEE 754 double-precision binary floating-point format: binary64
return sizeof(value);
}
};
}