#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include namespace DB { namespace ErrorCodes { extern const int LOGICAL_ERROR; } } /// Sets denominator type. enum class DenominatorMode : uint8_t { Compact, /// Compact denominator. StableIfBig, /// Stable denominator falling back to Compact if rank storage is not big enough. ExactType /// Denominator of specified exact type. }; namespace details { /// Look-up table of logarithms for integer numbers, used in HyperLogLogCounter. template struct LogLUT { LogLUT() { log_table[0] = 0.0; for (size_t i = 1; i <= M; ++i) log_table[i] = log(static_cast(i)); } double getLog(size_t x) const { if (x <= M) return log_table[x]; return log(static_cast(x)); } private: static constexpr size_t M = 1 << ((static_cast(K) <= 12) ? K : 12); double log_table[M + 1]; }; template struct MinCounterTypeHelper; template <> struct MinCounterTypeHelper<0> { using Type = UInt8; }; template <> struct MinCounterTypeHelper<1> { using Type = UInt16; }; template <> struct MinCounterTypeHelper<2> { using Type = UInt32; }; template <> struct MinCounterTypeHelper<3> { using Type = UInt64; }; /// Auxiliary structure for automatic determining minimum size of counter's type depending on its maximum value. /// Used in HyperLogLogCounter in order to spend memory efficiently. template struct MinCounterType { using Type = typename MinCounterTypeHelper< (MaxValue >= 1 << 8) + (MaxValue >= 1 << 16) + (MaxValue >= 1ULL << 32) >::Type; }; /// Denominator of expression for HyperLogLog algorithm. template class Denominator; /// Returns true if rank storage is big. constexpr bool isBigRankStore(UInt8 precision) { return precision >= 12; } /// Used to deduce denominator type depending on options provided. template struct IntermediateDenominator; template requires (denominator_mode != DenominatorMode::ExactType) struct IntermediateDenominator { using Type = double; }; template struct IntermediateDenominator { using Type = long double; }; template struct IntermediateDenominator { using Type = DenominatorType; }; /// "Lightweight" implementation of expression's denominator for HyperLogLog algorithm. /// Uses minimum amount of memory, but estimates may be unstable. /// Satisfiable when rank storage is small enough. template requires (!details::isBigRankStore(precision)) || (!(denominator_mode == DenominatorMode::StableIfBig)) class __attribute__((__packed__)) Denominator { private: using T = typename IntermediateDenominator::Type; public: Denominator(DenominatorType initial_value) /// NOLINT : denominator(initial_value) { } void update(UInt8 cur_rank, UInt8 new_rank) { denominator -= static_cast(1.0) / (1ULL << cur_rank); denominator += static_cast(1.0) / (1ULL << new_rank); } void update(UInt8 rank) { denominator += static_cast(1.0) / (1ULL << rank); } void clear() { denominator = 0; } DenominatorType get() const { return denominator; } private: T denominator; }; /// Fully-functional version of expression's denominator for HyperLogLog algorithm. /// Spends more space that lightweight version. Estimates will always be stable. /// Used when rank storage is big. template requires (details::isBigRankStore(precision)) && (denominator_mode == DenominatorMode::StableIfBig) class __attribute__((__packed__)) Denominator { public: Denominator(DenominatorType initial_value) /// NOLINT { rank_count[0] = static_cast(initial_value); } void update(UInt8 cur_rank, UInt8 new_rank) { --rank_count[cur_rank]; ++rank_count[new_rank]; } void update(UInt8 rank) { ++rank_count[rank]; } void clear() { memset(rank_count, 0, size * sizeof(UInt32)); } DenominatorType get() const { long double val = rank_count[size - 1]; for (int i = size - 2; i >= 0; --i) { val /= 2.0; val += rank_count[i]; } return static_cast(val); } private: static constexpr size_t size = max_rank + 1; UInt32 rank_count[size] = { 0 }; }; /// Number of trailing zeros. template struct TrailingZerosCounter; template <> struct TrailingZerosCounter { static int apply(UInt32 val) { return std::countr_zero(val); } }; template <> struct TrailingZerosCounter { static int apply(UInt64 val) { return std::countr_zero(val); } }; /// Size of counter's rank in bits. template struct RankWidth; template <> struct RankWidth { static constexpr UInt8 get() { return 5; } }; template <> struct RankWidth { static constexpr UInt8 get() { return 6; } }; } /// Sets behavior of HyperLogLog class. enum class HyperLogLogMode : uint8_t { Raw, /// No error correction. LinearCounting, /// LinearCounting error correction. BiasCorrected, /// HyperLogLog++ error correction. FullFeatured /// LinearCounting or HyperLogLog++ error correction (depending). }; /// Estimation of number of unique values using HyperLogLog algorithm. /// /// Theoretical relative error is ~1.04 / sqrt(2^precision), where /// precision is size of prefix of hash-function used for indexing (number of buckets M = 2^precision). /// Recommended values for precision are: 3..20. /// /// Source: "HyperLogLog: The analysis of a near-optimal cardinality estimation algorithm" /// (P. Flajolet et al., AOFA '07: Proceedings of the 2007 International Conference on Analysis /// of Algorithms). template < UInt8 precision, typename Key = UInt64, typename Hash = IntHash32, typename HashValueType = UInt32, typename DenominatorType = double, typename BiasEstimator = TrivialBiasEstimator, HyperLogLogMode mode = HyperLogLogMode::FullFeatured, DenominatorMode denominator_mode = DenominatorMode::StableIfBig> class HyperLogLogCounter : private Hash { private: /// Number of buckets. static constexpr size_t bucket_count = 1ULL << precision; /// Size of counter's rank in bits. static constexpr UInt8 rank_width = details::RankWidth::get(); using Value = UInt64; using RankStore = DB::CompactArray; public: using value_type = Value; /// ALWAYS_INLINE is required to have better code layout for uniqCombined function void ALWAYS_INLINE insert(Value value) { HashValueType hash = getHash(value); /// Divide hash to two sub-values. First is bucket number, second will be used to calculate rank. HashValueType bucket = extractBitSequence(hash, 0, precision); HashValueType tail = extractBitSequence(hash, precision, sizeof(HashValueType) * 8); UInt8 rank = calculateRank(tail); /// Update maximum rank for current bucket. update(bucket, rank); } UInt64 size() const { /// Normalizing factor for harmonic mean. static constexpr double alpha_m = bucket_count == 2 ? 0.351 : bucket_count == 4 ? 0.532 : bucket_count == 8 ? 0.626 : bucket_count == 16 ? 0.673 : bucket_count == 32 ? 0.697 : bucket_count == 64 ? 0.709 : 0.7213 / (1 + 1.079 / bucket_count); /// Harmonic mean for all buckets of 2^rank values is: bucket_count / ∑ 2^-rank_i, /// where ∑ 2^-rank_i - is denominator. double raw_estimate = alpha_m * bucket_count * bucket_count / denominator.get(); double final_estimate = fixRawEstimate(raw_estimate); return static_cast(final_estimate + 0.5); /// NOLINT } void merge(const HyperLogLogCounter & rhs) { const auto & rhs_rank_store = rhs.rank_store; for (HashValueType bucket = 0; bucket < bucket_count; ++bucket) update(bucket, rhs_rank_store[bucket]); } void read(DB::ReadBuffer & in) { if constexpr (std::endian::native == std::endian::little) in.readStrict(reinterpret_cast(this), sizeof(*this)); else { in.readStrict(reinterpret_cast(&rank_store), sizeof(RankStore)); constexpr size_t denom_size = sizeof(DenominatorCalculatorType); std::array denominator_copy; in.readStrict(denominator_copy.begin(), denom_size); for (size_t i = 0; i < denominator_copy.size(); i += (sizeof(UInt32) / sizeof(char))) { UInt32 * cur = reinterpret_cast(&denominator_copy[i]); DB::transformEndianness(*cur); } memcpy(reinterpret_cast(&denominator), denominator_copy.begin(), denom_size); in.readStrict(reinterpret_cast(&zeros), sizeof(ZerosCounterType)); DB::transformEndianness(zeros); } } static void skip(DB::ReadBuffer & in) { in.ignore(sizeof(RankStore) + sizeof(DenominatorCalculatorType) + sizeof(ZerosCounterType)); } void write(DB::WriteBuffer & out) const { if constexpr (std::endian::native == std::endian::little) out.write(reinterpret_cast(this), sizeof(*this)); else { out.write(reinterpret_cast(&rank_store), sizeof(RankStore)); constexpr size_t denom_size = sizeof(DenominatorCalculatorType); std::array denominator_copy; memcpy(denominator_copy.begin(), reinterpret_cast(&denominator), denom_size); for (size_t i = 0; i < denominator_copy.size(); i += (sizeof(UInt32) / sizeof(char))) { UInt32 * cur = reinterpret_cast(&denominator_copy[i]); DB::transformEndianness(*cur); } out.write(denominator_copy.begin(), denom_size); auto zeros_copy = zeros; DB::transformEndianness(zeros_copy); out.write(reinterpret_cast(&zeros_copy), sizeof(ZerosCounterType)); } } /// Read and write in text mode is suboptimal (but compatible with OLAPServer and Metrage). void readText(DB::ReadBuffer & in) { rank_store.readText(in); zeros = 0; denominator.clear(); for (HashValueType bucket = 0; bucket < bucket_count; ++bucket) { UInt8 rank = rank_store[bucket]; if (rank == 0) ++zeros; denominator.update(rank); } } static void skipText(DB::ReadBuffer & in) { UInt8 dummy; for (size_t i = 0; i < RankStore::size(); ++i) { if (i != 0) DB::assertChar(',', in); DB::readIntText(dummy, in); } } void writeText(DB::WriteBuffer & out) const { rank_store.writeText(out); } private: /// Extract subset of bits in [begin, end[ range. HashValueType extractBitSequence(HashValueType val, UInt8 begin, UInt8 end) const { return (val >> begin) & ((1ULL << (end - begin)) - 1); } /// Rank is number of trailing zeros. UInt8 calculateRank(HashValueType val) const { if (unlikely(val == 0)) return max_rank; auto zeros_plus_one = details::TrailingZerosCounter::apply(val) + 1; if (unlikely(zeros_plus_one) > max_rank) return max_rank; return zeros_plus_one; } HashValueType getHash(Value key) const { /// NOTE: this should be OK, since value is the same as key for HLL. return static_cast( Hash::operator()(static_cast(key))); } /// Update maximum rank for current bucket. /// ALWAYS_INLINE is required to have better code layout for uniqCombined function void ALWAYS_INLINE update(HashValueType bucket, UInt8 rank) { typename RankStore::Locus content = rank_store[bucket]; UInt8 cur_rank = static_cast(content); if (rank > cur_rank) { if (cur_rank == 0) --zeros; denominator.update(cur_rank, rank); content = rank; } } double fixRawEstimate(double raw_estimate) const { if ((mode == HyperLogLogMode::Raw) || ((mode == HyperLogLogMode::BiasCorrected) && BiasEstimator::isTrivial())) return raw_estimate; if (mode == HyperLogLogMode::LinearCounting) return applyLinearCorrection(raw_estimate); if ((mode == HyperLogLogMode::BiasCorrected) && !BiasEstimator::isTrivial()) return applyBiasCorrection(raw_estimate); if (mode == HyperLogLogMode::FullFeatured) { static constexpr double pow2_32 = 4294967296.0; double fixed_estimate; if (raw_estimate > (pow2_32 / 30.0)) fixed_estimate = raw_estimate; else fixed_estimate = applyCorrection(raw_estimate); return fixed_estimate; } throw Poco::Exception("Internal error", DB::ErrorCodes::LOGICAL_ERROR); } double applyCorrection(double raw_estimate) const { double fixed_estimate; if (BiasEstimator::isTrivial()) { if (raw_estimate <= (2.5 * bucket_count)) { /// Correction in case of small estimate. fixed_estimate = applyLinearCorrection(raw_estimate); } else fixed_estimate = raw_estimate; } else { fixed_estimate = applyBiasCorrection(raw_estimate); double linear_estimate = applyLinearCorrection(fixed_estimate); if (linear_estimate < BiasEstimator::getThreshold()) fixed_estimate = linear_estimate; } return fixed_estimate; } /// Correction used in HyperLogLog++ algorithm. /// Source: "HyperLogLog in Practice: Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm" /// (S. Heule et al., Proceedings of the EDBT 2013 Conference). double applyBiasCorrection(double raw_estimate) const { double fixed_estimate; if (raw_estimate <= (5 * bucket_count)) fixed_estimate = raw_estimate - BiasEstimator::getBias(raw_estimate); else fixed_estimate = raw_estimate; return fixed_estimate; } /// Calculation of unique values using LinearCounting algorithm. /// Source: "A Linear-time Probabilistic Counting Algorithm for Database Applications" /// (Whang et al., ACM Trans. Database Syst., pp. 208-229, 1990). double applyLinearCorrection(double raw_estimate) const { double fixed_estimate; if (zeros != 0) fixed_estimate = bucket_count * (log_lut.getLog(bucket_count) - log_lut.getLog(zeros)); else fixed_estimate = raw_estimate; return fixed_estimate; } static constexpr int max_rank = sizeof(HashValueType) * 8 - precision + 1; RankStore rank_store; /// Expression's denominator for HyperLogLog algorithm. using DenominatorCalculatorType = details::Denominator; DenominatorCalculatorType denominator{bucket_count}; /// Number of zeros in rank storage. using ZerosCounterType = typename details::MinCounterType::Type; ZerosCounterType zeros = bucket_count; static details::LogLUT log_lut; /// Checks. static_assert(precision < (sizeof(HashValueType) * 8), "Invalid parameter value"); }; /// Declaration of static variables for linker. template < UInt8 precision, typename Key, typename Hash, typename HashValueType, typename DenominatorType, typename BiasEstimator, HyperLogLogMode mode, DenominatorMode denominator_mode > details::LogLUT HyperLogLogCounter < precision, Key, Hash, HashValueType, DenominatorType, BiasEstimator, mode, denominator_mode >::log_lut; /// Lightweight implementation of expression's denominator is used in Metrage. /// Serialization format must not be changed. using HLL12 = HyperLogLogCounter< 12, UInt64, IntHash32, UInt32, double, TrivialBiasEstimator, HyperLogLogMode::FullFeatured, DenominatorMode::Compact >;